package org.apache.druid.server.coordinator.duty;

import com.google.common.collect.Lists;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.coordinator.BalancerSegmentHolder;
import org.apache.druid.server.coordinator.BalancerStrategy;
import org.apache.druid.server.coordinator.CoordinatorStats;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.LoadPeonCallback;
import org.apache.druid.server.coordinator.LoadQueuePeon;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;

/* loaded from: input_file:org/apache/druid/server/coordinator/duty/BalanceSegments.class */
public class BalanceSegments implements CoordinatorDuty {
    protected static final EmittingLogger log = new EmittingLogger(BalanceSegments.class);
    protected final DruidCoordinator coordinator;
    protected final Map<String, ConcurrentHashMap<SegmentId, BalancerSegmentHolder>> currentlyMovingSegments = new HashMap();
    private static final int DEFAULT_RESERVOIR_SIZE = 1;

    public BalanceSegments(DruidCoordinator druidCoordinator) {
        this.coordinator = druidCoordinator;
    }

    protected void reduceLifetimes(String str) {
        for (BalancerSegmentHolder balancerSegmentHolder : this.currentlyMovingSegments.get(str).values()) {
            balancerSegmentHolder.reduceLifetime();
            if (balancerSegmentHolder.getLifetime() <= 0) {
                log.makeAlert("[%s]: Balancer move segments queue has a segment stuck", str).addData("segment", balancerSegmentHolder.getSegment().getId()).addData("server", balancerSegmentHolder.getFromServer().getMetadata()).emit();
            }
        }
    }

    @Override // org.apache.druid.server.coordinator.duty.CoordinatorDuty
    public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams druidCoordinatorRuntimeParams) {
        CoordinatorStats coordinatorStats = new CoordinatorStats();
        druidCoordinatorRuntimeParams.getDruidCluster().getHistoricals().forEach((str, navigableSet) -> {
            balanceTier(druidCoordinatorRuntimeParams, str, navigableSet, coordinatorStats);
        });
        return druidCoordinatorRuntimeParams.buildFromExisting().withCoordinatorStats(coordinatorStats).build();
    }

    private void balanceTier(DruidCoordinatorRuntimeParams druidCoordinatorRuntimeParams, String str, SortedSet<ServerHolder> sortedSet, CoordinatorStats coordinatorStats) {
        if (druidCoordinatorRuntimeParams.getUsedSegments().size() == 0) {
            log.info("Metadata segments are not available. Cannot balance.", new Object[0]);
            return;
        }
        this.currentlyMovingSegments.computeIfAbsent(str, str2 -> {
            return new ConcurrentHashMap();
        });
        if (!this.currentlyMovingSegments.get(str).isEmpty()) {
            reduceLifetimes(str);
            log.info("[%s]: Still waiting on %,d segments to be moved. Skipping balance.", str, Integer.valueOf(this.currentlyMovingSegments.get(str).size()));
            return;
        }
        Map map = (Map) sortedSet.stream().collect(Collectors.partitioningBy((v0) -> {
            return v0.isDecommissioning();
        }));
        List<ServerHolder> list = (List) map.get(true);
        List<ServerHolder> list2 = (List) map.get(false);
        log.info("Found %d active servers, %d decommissioning servers", Integer.valueOf(list2.size()), Integer.valueOf(list.size()));
        if ((list.isEmpty() && list2.size() <= 1) || list2.isEmpty()) {
            log.warn("[%s]: insufficient active servers. Cannot balance.", str);
            return;
        }
        int i = 0;
        Iterator<ServerHolder> it2 = sortedSet.iterator();
        while (it2.hasNext()) {
            i += it2.next().getServer().getNumSegments();
        }
        if (i == 0) {
            log.info("No segments found. Cannot balance.", new Object[0]);
            return;
        }
        int min = Math.min(druidCoordinatorRuntimeParams.getCoordinatorDynamicConfig().getMaxSegmentsToMove(), i);
        int ceil = (int) Math.ceil(min * (druidCoordinatorRuntimeParams.getCoordinatorDynamicConfig().getDecommissioningMaxPercentOfMaxSegmentsToMove() / 100.0d));
        log.info("Processing %d segments for moving from decommissioning servers", Integer.valueOf(ceil));
        Pair<Integer, Integer> balanceServers = balanceServers(druidCoordinatorRuntimeParams, list, list2, ceil);
        int intValue = min - balanceServers.lhs.intValue();
        log.info("Processing %d segments for balancing between active servers", Integer.valueOf(intValue));
        Pair<Integer, Integer> balanceServers2 = balanceServers(druidCoordinatorRuntimeParams, list2, list2, intValue);
        int intValue2 = balanceServers2.lhs.intValue() + balanceServers.lhs.intValue();
        int intValue3 = balanceServers2.rhs.intValue() + balanceServers.rhs.intValue();
        if (intValue3 == min) {
            log.info("No good moves found in tier [%s]", str);
        }
        coordinatorStats.addToTieredStat("unmovedCount", str, intValue3);
        coordinatorStats.addToTieredStat("movedCount", str, intValue2);
        if (druidCoordinatorRuntimeParams.getCoordinatorDynamicConfig().emitBalancingStats()) {
            druidCoordinatorRuntimeParams.getBalancerStrategy().emitStats(str, coordinatorStats, Lists.newArrayList(sortedSet));
        }
        log.info("[%s]: Segments Moved: [%d] Segments Let Alone: [%d]", str, Integer.valueOf(intValue2), Integer.valueOf(intValue3));
    }

    private Pair<Integer, Integer> balanceServers(DruidCoordinatorRuntimeParams druidCoordinatorRuntimeParams, List<ServerHolder> list, List<ServerHolder> list2, int i) {
        if (i <= 0) {
            return new Pair<>(0, 0);
        }
        BalancerStrategy balancerStrategy = druidCoordinatorRuntimeParams.getBalancerStrategy();
        int i2 = 2 * i;
        int maxSegmentsInNodeLoadingQueue = druidCoordinatorRuntimeParams.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue();
        int i3 = 0;
        int i4 = 0;
        Iterator<BalancerSegmentHolder> pickSegmentsToMove = balancerStrategy.pickSegmentsToMove(list, druidCoordinatorRuntimeParams.getBroadcastDatasources(), druidCoordinatorRuntimeParams.getCoordinatorDynamicConfig().useBatchedSegmentSampler() ? i : 1, druidCoordinatorRuntimeParams.getCoordinatorDynamicConfig().getPercentOfSegmentsToConsiderPerMove());
        int i5 = 0;
        while (true) {
            if (i3 + i4 >= i) {
                break;
            }
            if (!pickSegmentsToMove.hasNext()) {
                log.info("All servers to move segments from are empty, ending run.", new Object[0]);
                break;
            }
            BalancerSegmentHolder next = pickSegmentsToMove.next();
            if (druidCoordinatorRuntimeParams.getUsedSegments().contains(next.getSegment())) {
                DataSegment segment = next.getSegment();
                ImmutableDruidServer fromServer = next.getFromServer();
                List<ServerHolder> list3 = (List) list2.stream().filter(serverHolder -> {
                    return serverHolder.getServer().equals(fromServer) || (!serverHolder.isServingSegment(segment) && (maxSegmentsInNodeLoadingQueue <= 0 || serverHolder.getNumberOfSegmentsInQueue() < maxSegmentsInNodeLoadingQueue));
                }).collect(Collectors.toList());
                if (list3.size() > 0) {
                    ServerHolder findNewSegmentHomeBalancer = balancerStrategy.findNewSegmentHomeBalancer(segment, list3);
                    if (findNewSegmentHomeBalancer == null || findNewSegmentHomeBalancer.getServer().equals(fromServer)) {
                        log.debug("Segment [%s] is 'optimally' placed.", segment.getId());
                        i4++;
                    } else if (moveSegment(next, findNewSegmentHomeBalancer.getServer(), druidCoordinatorRuntimeParams)) {
                        i3++;
                    } else {
                        i4++;
                    }
                } else {
                    log.debug("No valid movement destinations for segment [%s].", segment.getId());
                    i4++;
                }
            }
            if (i5 >= i2) {
                log.info("Unable to select %d remaining candidate segments out of %d total to balance after %d iterations, ending run.", Integer.valueOf((i - i3) - i4), Integer.valueOf(i), Integer.valueOf(i5));
                break;
            }
            i5++;
        }
        return new Pair<>(Integer.valueOf(i3), Integer.valueOf(i4));
    }

    protected boolean moveSegment(BalancerSegmentHolder balancerSegmentHolder, ImmutableDruidServer immutableDruidServer, DruidCoordinatorRuntimeParams druidCoordinatorRuntimeParams) {
        LoadQueuePeon loadQueuePeon = druidCoordinatorRuntimeParams.getLoadManagementPeons().get(immutableDruidServer.getName());
        ImmutableDruidServer fromServer = balancerSegmentHolder.getFromServer();
        DataSegment segment = balancerSegmentHolder.getSegment();
        SegmentId id = segment.getId();
        if (loadQueuePeon.getSegmentsToLoad().contains(segment) || immutableDruidServer.getSegment(id) != null || new ServerHolder(immutableDruidServer, loadQueuePeon).getAvailableSize() <= segment.getSize()) {
            return false;
        }
        log.debug("Moving [%s] from [%s] to [%s]", id, fromServer.getName(), immutableDruidServer.getName());
        LoadPeonCallback loadPeonCallback = null;
        try {
            ConcurrentHashMap<SegmentId, BalancerSegmentHolder> concurrentHashMap = this.currentlyMovingSegments.get(immutableDruidServer.getTier());
            concurrentHashMap.put(id, balancerSegmentHolder);
            loadPeonCallback = () -> {
            };
            this.coordinator.moveSegment(druidCoordinatorRuntimeParams, fromServer, immutableDruidServer, segment, loadPeonCallback);
            return true;
        } catch (Exception e) {
            log.makeAlert(e, StringUtils.format("[%s] : Moving exception", id), new Object[0]).emit();
            if (loadPeonCallback == null) {
                return false;
            }
            loadPeonCallback.execute();
            return false;
        }
    }
}
