package org.apache.distributedlog.service.balancer;

import com.google.common.base.Optional;
import com.google.common.util.concurrent.RateLimiter;
import com.twitter.util.Await;
import com.twitter.util.Function;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.distributedlog.client.monitor.MonitorServiceClient;
import org.apache.distributedlog.service.ClientUtils;
import org.apache.distributedlog.service.DLSocketAddress;
import org.apache.distributedlog.service.DistributedLogClient;
import org.apache.distributedlog.service.DistributedLogClientBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/distributedlog/service/balancer/ClusterBalancer.class */
public class ClusterBalancer implements Balancer {
    private static final Logger logger = LoggerFactory.getLogger(ClusterBalancer.class);
    protected final DistributedLogClientBuilder clientBuilder;
    protected final DistributedLogClient client;
    protected final MonitorServiceClient monitor;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/distributedlog/service/balancer/ClusterBalancer$Host.class */
    public static class Host {
        final SocketAddress address;
        final Set<String> streams;
        final DistributedLogClientBuilder clientBuilder;
        DistributedLogClient client = null;
        MonitorServiceClient monitor = null;

        Host(SocketAddress socketAddress, Set<String> set, DistributedLogClientBuilder distributedLogClientBuilder) {
            this.address = socketAddress;
            this.streams = set;
            this.clientBuilder = distributedLogClientBuilder;
        }

        private void initializeClientsIfNeeded() {
            if (null == this.client) {
                Pair<DistributedLogClient, MonitorServiceClient> createDistributedLogClient = ClusterBalancer.createDistributedLogClient(this.address, this.clientBuilder);
                this.client = (DistributedLogClient) createDistributedLogClient.getLeft();
                this.monitor = (MonitorServiceClient) createDistributedLogClient.getRight();
            }
        }

        synchronized DistributedLogClient getClient() {
            initializeClientsIfNeeded();
            return this.client;
        }

        synchronized MonitorServiceClient getMonitor() {
            initializeClientsIfNeeded();
            return this.monitor;
        }

        synchronized void close() {
            if (null != this.client) {
                this.client.close();
            }
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("Host(").append(this.address).append(")");
            return sb.toString();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/distributedlog/service/balancer/ClusterBalancer$HostComparator.class */
    public static class HostComparator implements Comparator<Host>, Serializable {
        private static final long serialVersionUID = 7984973796525102538L;

        HostComparator() {
        }

        @Override // java.util.Comparator
        public int compare(Host host, Host host2) {
            return host2.streams.size() - host.streams.size();
        }
    }

    public ClusterBalancer(DistributedLogClientBuilder distributedLogClientBuilder) {
        this(distributedLogClientBuilder, ClientUtils.buildClient(distributedLogClientBuilder));
    }

    ClusterBalancer(DistributedLogClientBuilder distributedLogClientBuilder, Pair<DistributedLogClient, MonitorServiceClient> pair) {
        this.clientBuilder = distributedLogClientBuilder;
        this.client = (DistributedLogClient) pair.getLeft();
        this.monitor = (MonitorServiceClient) pair.getRight();
    }

    static Pair<DistributedLogClient, MonitorServiceClient> createDistributedLogClient(SocketAddress socketAddress, DistributedLogClientBuilder distributedLogClientBuilder) {
        return ClientUtils.buildClient(DistributedLogClientBuilder.newBuilder(distributedLogClientBuilder).host(socketAddress));
    }

    @Override // org.apache.distributedlog.service.balancer.Balancer
    public void balanceAll(String str, int i, Optional<RateLimiter> optional) {
        balance(0, 0.0d, i, Optional.of(str), optional);
    }

    @Override // org.apache.distributedlog.service.balancer.Balancer
    public void balance(int i, double d, int i2, Optional<RateLimiter> optional) {
        balance(i, d, i2, Optional.absent(), optional);
    }

    public void balance(int i, double d, int i2, Optional<String> optional, Optional<RateLimiter> optional2) {
        Map streamOwnershipDistribution = this.monitor.getStreamOwnershipDistribution();
        if (streamOwnershipDistribution.size() <= 1) {
            return;
        }
        InetSocketAddress inetSocketAddress = null;
        if (optional.isPresent()) {
            inetSocketAddress = DLSocketAddress.parseSocketAddress((String) optional.get());
            logger.info("Balancer source is {}", inetSocketAddress);
            if (!streamOwnershipDistribution.containsKey(inetSocketAddress)) {
                return;
            }
        }
        ArrayList arrayList = new ArrayList(streamOwnershipDistribution.size());
        for (Map.Entry entry : streamOwnershipDistribution.entrySet()) {
            arrayList.add(new Host((SocketAddress) entry.getKey(), (Set) entry.getValue(), this.clientBuilder));
        }
        Collections.sort(arrayList, new HostComparator());
        int i3 = -1;
        if (null != inetSocketAddress) {
            try {
                Iterator<Host> it = arrayList.iterator();
                while (it.hasNext()) {
                    i3++;
                    if (inetSocketAddress.equals(it.next().address)) {
                        break;
                    }
                }
            } finally {
                Iterator<Host> it2 = arrayList.iterator();
                while (it2.hasNext()) {
                    it2.next().close();
                }
            }
        }
        int i4 = 0;
        Iterator<Host> it3 = arrayList.iterator();
        while (it3.hasNext()) {
            i4 += it3.next().streams.size();
        }
        double size = i3 >= 0 ? i4 / (arrayList.size() - 1) : i4 / arrayList.size();
        int max = Math.max(1, (int) (size + ((size * d) / 100.0d)));
        if (i3 >= 0) {
            moveStreams(arrayList, new AtomicInteger(i3), Math.max(0, i), new AtomicInteger(arrayList.size() - 1), max, optional2);
            moveRemainingStreamsFromSource(arrayList.get(i3), arrayList, optional2);
        } else {
            int max2 = Math.max((int) Math.ceil(size), i);
            AtomicInteger atomicInteger = new AtomicInteger(0);
            AtomicInteger atomicInteger2 = new AtomicInteger(arrayList.size() - 1);
            while (atomicInteger.get() < atomicInteger2.get()) {
                moveStreams(arrayList, atomicInteger, max2, atomicInteger2, max, optional2);
                atomicInteger.incrementAndGet();
            }
        }
    }

    void moveStreams(List<Host> list, AtomicInteger atomicInteger, int i, AtomicInteger atomicInteger2, int i2, Optional<RateLimiter> optional) {
        if (atomicInteger.get() < 0 || atomicInteger.get() >= list.size() || atomicInteger2.get() < 0 || atomicInteger2.get() >= list.size() || atomicInteger.get() >= atomicInteger2.get()) {
            return;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Moving streams : hosts = {}, from = {}, to = {} : from_low_water_mark = {}, to_high_water_mark = {}", new Object[]{list, Integer.valueOf(atomicInteger.get()), Integer.valueOf(atomicInteger2.get()), Integer.valueOf(i), Integer.valueOf(i2)});
        }
        Host host = list.get(atomicInteger.get());
        int size = host.streams.size();
        if (size <= i) {
            return;
        }
        int i3 = size - i;
        LinkedList linkedList = new LinkedList(host.streams);
        Collections.shuffle(linkedList);
        if (logger.isDebugEnabled()) {
            logger.debug("Try to move {} streams from host {} : streams = {}", new Object[]{Integer.valueOf(i3), host.address, linkedList});
        }
        while (true) {
            int i4 = i3;
            i3--;
            if (i4 <= 0 || linkedList.isEmpty()) {
                return;
            }
            if (optional.isPresent()) {
                ((RateLimiter) optional.get()).acquire();
            }
            Host host2 = list.get(atomicInteger2.get());
            while (host2.streams.size() >= i2) {
                int decrementAndGet = atomicInteger2.decrementAndGet();
                logger.info("move to host : {}, from {}", Integer.valueOf(decrementAndGet), Integer.valueOf(atomicInteger.get()));
                if (decrementAndGet <= atomicInteger.get()) {
                    return;
                }
                host2 = list.get(decrementAndGet);
                if (logger.isDebugEnabled()) {
                    logger.debug("Target host to move moved to host {} @ {}", Integer.valueOf(decrementAndGet), host2);
                }
            }
            String str = (String) linkedList.remove();
            if (moveStream(str, host, host2)) {
                host.streams.remove(str);
                host2.streams.add(str);
            }
        }
    }

    void moveRemainingStreamsFromSource(Host host, List<Host> list, Optional<RateLimiter> optional) {
        LinkedList linkedList = new LinkedList(host.streams);
        Collections.shuffle(linkedList);
        if (logger.isDebugEnabled()) {
            logger.debug("Try to move remaining streams from {} : {}", host, linkedList);
        }
        int size = list.size() - 1;
        while (!linkedList.isEmpty()) {
            if (optional.isPresent()) {
                ((RateLimiter) optional.get()).acquire();
            }
            Host host2 = list.get(size);
            if (!host2.address.equals(host.address)) {
                String str = (String) linkedList.remove();
                if (moveStream(str, host, host2)) {
                    host.streams.remove(str);
                    host2.streams.add(str);
                }
            }
            size--;
            if (size < 0) {
                size = list.size() - 1;
            }
        }
    }

    private boolean moveStream(String str, Host host, Host host2) {
        try {
            doMoveStream(str, host, host2);
            return true;
        } catch (Exception e) {
            return false;
        }
    }

    private void doMoveStream(final String str, final Host host, final Host host2) throws Exception {
        logger.info("Moving stream {} from {} to {}.", new Object[]{str, host.address, host2.address});
        Await.result(host.getClient().release(str).flatMap(new Function<Void, Future<Void>>() { // from class: org.apache.distributedlog.service.balancer.ClusterBalancer.1
            public Future<Void> apply(Void r6) {
                ClusterBalancer.logger.info("Released stream {} from {}.", str, host.address);
                return host2.getMonitor().check(str).addEventListener(new FutureEventListener<Void>() { // from class: org.apache.distributedlog.service.balancer.ClusterBalancer.1.1
                    public void onSuccess(Void r8) {
                        ClusterBalancer.logger.info("Moved stream {} from {} to {}.", new Object[]{str, host.address, host2.address});
                    }

                    public void onFailure(Throwable th) {
                        ClusterBalancer.logger.info("Failed to move stream {} from {} to {} : ", new Object[]{str, host.address, host2.address, th});
                    }
                });
            }
        }));
    }

    @Override // org.apache.distributedlog.service.balancer.Balancer
    public void close() {
        this.client.close();
    }
}
