package org.apache.druid.server.coordinator.rules;

import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import it.unimi.dsi.fastutil.objects.ObjectIterator;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.TreeSet;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.coordinator.BalancerStrategy;
import org.apache.druid.server.coordinator.CoordinatorStats;
import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ReplicationThrottler;
import org.apache.druid.server.coordinator.SegmentReplicantLookup;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;

/* loaded from: input_file:org/apache/druid/server/coordinator/rules/LoadRule.class */
public abstract class LoadRule implements Rule {
    private static final EmittingLogger log = new EmittingLogger(LoadRule.class);
    static final String ASSIGNED_COUNT = "assignedCount";
    static final String DROPPED_COUNT = "droppedCount";
    public static final String REQUIRED_CAPACITY = "requiredCapacity";
    private final Object2IntMap<String> targetReplicants = new Object2IntOpenHashMap();
    private final Object2IntMap<String> currentReplicants = new Object2IntOpenHashMap();
    private final Map<String, ServerHolder> strategyCache = new HashMap();

    @Override // org.apache.druid.server.coordinator.rules.Rule
    public CoordinatorStats run(DruidCoordinator druidCoordinator, DruidCoordinatorRuntimeParams druidCoordinatorRuntimeParams, DataSegment dataSegment) {
        try {
            this.targetReplicants.putAll(getTieredReplicants());
            this.currentReplicants.putAll(druidCoordinatorRuntimeParams.getSegmentReplicantLookup().getClusterTiers(dataSegment.getId()));
            CoordinatorStats coordinatorStats = new CoordinatorStats();
            assign(druidCoordinatorRuntimeParams, dataSegment, coordinatorStats);
            drop(druidCoordinatorRuntimeParams, dataSegment, coordinatorStats);
            ObjectIterator it = this.targetReplicants.keySet().iterator();
            while (it.hasNext()) {
                coordinatorStats.addToTieredStat(REQUIRED_CAPACITY, (String) it.next(), dataSegment.getSize() * this.targetReplicants.getInt(r0));
            }
            return coordinatorStats;
        } finally {
            this.targetReplicants.clear();
            this.currentReplicants.clear();
            this.strategyCache.clear();
        }
    }

    @Override // org.apache.druid.server.coordinator.rules.Rule
    public boolean canLoadSegments() {
        return true;
    }

    @Override // org.apache.druid.server.coordinator.rules.Rule
    public void updateUnderReplicated(Map<String, Object2LongMap<String>> map, SegmentReplicantLookup segmentReplicantLookup, DataSegment dataSegment) {
        getTieredReplicants().forEach((str, num) -> {
            ((Object2LongMap) map.computeIfAbsent(str, str -> {
                return new Object2LongOpenHashMap();
            })).addTo(dataSegment.getDataSource(), Math.max(num.intValue() - segmentReplicantLookup.getLoadedReplicants(dataSegment.getId(), str), 0));
        });
    }

    private void assign(DruidCoordinatorRuntimeParams druidCoordinatorRuntimeParams, DataSegment dataSegment, CoordinatorStats coordinatorStats) {
        int totalReplicants = druidCoordinatorRuntimeParams.getSegmentReplicantLookup().getTotalReplicants(dataSegment.getId());
        if (!this.currentReplicants.isEmpty() || totalReplicants > 0) {
            assignReplicas(druidCoordinatorRuntimeParams, dataSegment, coordinatorStats, null);
            return;
        }
        ServerHolder assignPrimary = assignPrimary(druidCoordinatorRuntimeParams, dataSegment);
        if (assignPrimary == null) {
            return;
        }
        String tier = assignPrimary.getServer().getTier();
        coordinatorStats.addToTieredStat(ASSIGNED_COUNT, tier, 1 + assignReplicasForTier(tier, this.targetReplicants.getOrDefault(tier, 0), 1, druidCoordinatorRuntimeParams, createLoadQueueSizeLimitingPredicate(druidCoordinatorRuntimeParams).and(serverHolder -> {
            return !serverHolder.equals(assignPrimary);
        }), dataSegment));
        assignReplicas(druidCoordinatorRuntimeParams, dataSegment, coordinatorStats, tier);
    }

    private static Predicate<ServerHolder> createLoadQueueSizeLimitingPredicate(DruidCoordinatorRuntimeParams druidCoordinatorRuntimeParams) {
        int maxSegmentsInNodeLoadingQueue = druidCoordinatorRuntimeParams.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue();
        return maxSegmentsInNodeLoadingQueue <= 0 ? (v0) -> {
            return Objects.nonNull(v0);
        } : serverHolder -> {
            return serverHolder != null && serverHolder.getNumberOfSegmentsInQueue() < maxSegmentsInNodeLoadingQueue;
        };
    }

    private static List<ServerHolder> getFilteredHolders(String str, DruidCluster druidCluster, Predicate<ServerHolder> predicate) {
        NavigableSet<ServerHolder> historicalsByTier = druidCluster.getHistoricalsByTier(str);
        if (historicalsByTier == null) {
            log.makeAlert("Tier[%s] has no servers! Check your cluster configuration!", new Object[]{str}).emit();
            return Collections.emptyList();
        }
        Predicate predicate2 = serverHolder -> {
            return !serverHolder.isDecommissioning();
        };
        return (List) historicalsByTier.stream().filter(predicate2.and(predicate)).collect(Collectors.toList());
    }

    @Nullable
    private ServerHolder assignPrimary(DruidCoordinatorRuntimeParams druidCoordinatorRuntimeParams, DataSegment dataSegment) {
        ServerHolder serverHolder = null;
        ObjectIterator it = this.targetReplicants.object2IntEntrySet().iterator();
        while (it.hasNext()) {
            Object2IntMap.Entry entry = (Object2IntMap.Entry) it.next();
            int intValue = entry.getIntValue();
            if (intValue > 0) {
                String str = (String) entry.getKey();
                String format = StringUtils.format("No available [%s] servers or node capacity to assign primary segment[%s]! Expected Replicants[%d]", new Object[]{str, dataSegment.getId(), Integer.valueOf(intValue)});
                List<ServerHolder> filteredHolders = getFilteredHolders(str, druidCoordinatorRuntimeParams.getDruidCluster(), createLoadQueueSizeLimitingPredicate(druidCoordinatorRuntimeParams));
                if (filteredHolders.isEmpty()) {
                    log.warn(format, new Object[0]);
                } else {
                    ServerHolder findNewSegmentHomeReplicator = druidCoordinatorRuntimeParams.getBalancerStrategy().findNewSegmentHomeReplicator(dataSegment, filteredHolders);
                    if (findNewSegmentHomeReplicator == null) {
                        log.warn(format, new Object[0]);
                    } else {
                        this.strategyCache.put(str, findNewSegmentHomeReplicator);
                        if (serverHolder == null || findNewSegmentHomeReplicator.getServer().getPriority() > serverHolder.getServer().getPriority()) {
                            serverHolder = findNewSegmentHomeReplicator;
                        }
                    }
                }
            }
        }
        if (serverHolder != null) {
            this.strategyCache.remove(serverHolder.getServer().getTier());
            log.info("Assigning 'primary' for segment [%s] to server [%s] in tier [%s]", new Object[]{dataSegment.getId(), serverHolder.getServer().getName(), serverHolder.getServer().getTier()});
            serverHolder.getPeon().loadSegment(dataSegment, null);
        }
        return serverHolder;
    }

    private void assignReplicas(DruidCoordinatorRuntimeParams druidCoordinatorRuntimeParams, DataSegment dataSegment, CoordinatorStats coordinatorStats, @Nullable String str) {
        ObjectIterator it = this.targetReplicants.object2IntEntrySet().iterator();
        while (it.hasNext()) {
            String str2 = (String) ((Object2IntMap.Entry) it.next()).getKey();
            if (str2.equals(str)) {
                log.info("Skipping replica assignment for tier [%s]", new Object[]{str2});
            } else {
                coordinatorStats.addToTieredStat(ASSIGNED_COUNT, str2, assignReplicasForTier(str2, r0.getIntValue(), druidCoordinatorRuntimeParams.getSegmentReplicantLookup().getTotalReplicants(dataSegment.getId(), str2), druidCoordinatorRuntimeParams, createLoadQueueSizeLimitingPredicate(druidCoordinatorRuntimeParams), dataSegment));
            }
        }
    }

    private int assignReplicasForTier(String str, int i, int i2, DruidCoordinatorRuntimeParams druidCoordinatorRuntimeParams, Predicate<ServerHolder> predicate, DataSegment dataSegment) {
        int i3 = i - i2;
        if (i3 <= 0) {
            return 0;
        }
        String format = StringUtils.format("No available [%s] servers or node capacity to assign segment[%s]! Expected Replicants[%d]", new Object[]{str, dataSegment.getId(), Integer.valueOf(i)});
        List<ServerHolder> filteredHolders = getFilteredHolders(str, druidCoordinatorRuntimeParams.getDruidCluster(), predicate);
        if (filteredHolders.isEmpty()) {
            log.warn(format, new Object[0]);
            return 0;
        }
        ReplicationThrottler replicationManager = druidCoordinatorRuntimeParams.getReplicationManager();
        for (int i4 = 0; i4 < i3; i4++) {
            if (!replicationManager.canCreateReplicant(str)) {
                log.info("Throttling replication for segment [%s] in tier [%s]", new Object[]{dataSegment.getId(), str});
                return i4;
            }
            ServerHolder remove = this.strategyCache.remove(str);
            if (remove == null) {
                remove = druidCoordinatorRuntimeParams.getBalancerStrategy().findNewSegmentHomeReplicator(dataSegment, filteredHolders);
            }
            if (remove == null) {
                log.warn(format, new Object[0]);
                return i4;
            }
            filteredHolders.remove(remove);
            SegmentId id = dataSegment.getId();
            replicationManager.registerReplicantCreation(str, id, remove.getServer().getHost());
            log.info("Assigning 'replica' for segment [%s] to server [%s] in tier [%s]", new Object[]{dataSegment.getId(), remove.getServer().getName(), remove.getServer().getTier()});
            remove.getPeon().loadSegment(dataSegment, () -> {
                replicationManager.unregisterReplicantCreation(str, id);
            });
        }
        return i3;
    }

    private void drop(DruidCoordinatorRuntimeParams druidCoordinatorRuntimeParams, DataSegment dataSegment, CoordinatorStats coordinatorStats) {
        int dropForTier;
        DruidCluster druidCluster = druidCoordinatorRuntimeParams.getDruidCluster();
        if (loadingInProgress(druidCluster)) {
            log.info("Loading in progress, skipping drop until loading is complete", new Object[0]);
            return;
        }
        ObjectIterator it = this.currentReplicants.object2IntEntrySet().iterator();
        while (it.hasNext()) {
            Object2IntMap.Entry entry = (Object2IntMap.Entry) it.next();
            String str = (String) entry.getKey();
            NavigableSet<ServerHolder> historicalsByTier = druidCluster.getHistoricalsByTier(str);
            if (historicalsByTier == null) {
                log.makeAlert("No holders found for tier[%s]", new Object[]{str}).emit();
                dropForTier = 0;
            } else {
                int intValue = entry.getIntValue() - this.targetReplicants.getOrDefault(str, 0);
                dropForTier = intValue > 0 ? dropForTier(intValue, historicalsByTier, dataSegment, druidCoordinatorRuntimeParams.getBalancerStrategy()) : 0;
            }
            coordinatorStats.addToTieredStat(DROPPED_COUNT, str, dropForTier);
        }
    }

    private boolean loadingInProgress(DruidCluster druidCluster) {
        ObjectIterator it = this.targetReplicants.object2IntEntrySet().iterator();
        while (it.hasNext()) {
            Object2IntMap.Entry entry = (Object2IntMap.Entry) it.next();
            String str = (String) entry.getKey();
            if (druidCluster.hasTier(str) && entry.getIntValue() > this.currentReplicants.getOrDefault(str, 0)) {
                return true;
            }
        }
        return false;
    }

    private static int dropForTier(int i, NavigableSet<ServerHolder> navigableSet, DataSegment dataSegment, BalancerStrategy balancerStrategy) {
        Map map = (Map) navigableSet.stream().filter(serverHolder -> {
            return serverHolder.isServingSegment(dataSegment);
        }).collect(Collectors.partitioningBy((v0) -> {
            return v0.isDecommissioning();
        }, Collectors.toCollection(TreeSet::new)));
        TreeSet treeSet = (TreeSet) map.get(true);
        TreeSet treeSet2 = (TreeSet) map.get(false);
        int dropSegmentFromServers = dropSegmentFromServers(balancerStrategy, dataSegment, treeSet, i);
        if (dropSegmentFromServers > 0) {
            dropSegmentFromServers = dropSegmentFromServers(balancerStrategy, dataSegment, treeSet2, dropSegmentFromServers);
        }
        if (dropSegmentFromServers != 0) {
            log.warn("I have no servers serving [%s]?", new Object[]{dataSegment.getId()});
        }
        return i - dropSegmentFromServers;
    }

    private static int dropSegmentFromServers(BalancerStrategy balancerStrategy, DataSegment dataSegment, NavigableSet<ServerHolder> navigableSet, int i) {
        Iterator<ServerHolder> pickServersToDrop = balancerStrategy.pickServersToDrop(dataSegment, navigableSet);
        while (i > 0 && pickServersToDrop.hasNext()) {
            ServerHolder next = pickServersToDrop.next();
            if (next.isServingSegment(dataSegment)) {
                log.info("Dropping segment [%s] on server [%s] in tier [%s]", new Object[]{dataSegment.getId(), next.getServer().getName(), next.getServer().getTier()});
                next.getPeon().dropSegment(dataSegment, null);
                i--;
            } else {
                log.warn("Server [%s] is no longer serving segment [%s], skipping drop.", new Object[]{next.getServer().getName(), dataSegment.getId()});
            }
        }
        return i;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void validateTieredReplicants(Map<String, Integer> map) {
        if (map.size() == 0) {
            throw new IAE("A rule with empty tiered replicants is invalid", new Object[0]);
        }
        for (Map.Entry<String, Integer> entry : map.entrySet()) {
            if (entry.getValue() == null) {
                throw new IAE("Replicant value cannot be empty", new Object[0]);
            }
            if (entry.getValue().intValue() < 0) {
                throw new IAE("Replicant value [%d] is less than 0, which is not allowed", new Object[]{entry.getValue()});
            }
        }
    }

    public abstract Map<String, Integer> getTieredReplicants();

    public abstract int getNumReplicants(String str);
}
