package org.apache.distributedlog.client.proxy;

import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.twitter.util.FutureEventListener;
import java.net.SocketAddress;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.distributedlog.client.ClientConfig;
import org.apache.distributedlog.client.proxy.ProxyClient;
import org.apache.distributedlog.client.stats.ClientStats;
import org.apache.distributedlog.client.stats.OpStats;
import org.apache.distributedlog.thrift.service.ClientInfo;
import org.apache.distributedlog.thrift.service.ServerInfo;
import org.jboss.netty.util.HashedWheelTimer;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/distributedlog/client/proxy/ProxyClientManager.class */
public class ProxyClientManager implements TimerTask {
    private static final Logger logger = LoggerFactory.getLogger(ProxyClientManager.class);
    private final ClientConfig clientConfig;
    private final ProxyClient.Builder clientBuilder;
    private final HashedWheelTimer timer;
    private final HostProvider hostProvider;
    private volatile Timeout periodicHandshakeTask;
    private final ConcurrentHashMap<SocketAddress, ProxyClient> address2Services = new ConcurrentHashMap<>();
    private final CopyOnWriteArraySet<ProxyListener> proxyListeners = new CopyOnWriteArraySet<>();
    private volatile boolean closed = false;
    private volatile boolean periodicHandshakeEnabled = true;
    private final Stopwatch lastOwnershipSyncStopwatch;
    private final OpStats handshakeStats;

    public ProxyClientManager(ClientConfig clientConfig, ProxyClient.Builder builder, HashedWheelTimer hashedWheelTimer, HostProvider hostProvider, ClientStats clientStats) {
        this.clientConfig = clientConfig;
        this.clientBuilder = builder;
        this.timer = hashedWheelTimer;
        this.hostProvider = hostProvider;
        this.handshakeStats = clientStats.getOpStats("handshake");
        scheduleHandshake();
        this.lastOwnershipSyncStopwatch = Stopwatch.createStarted();
    }

    private void scheduleHandshake() {
        if (this.clientConfig.getPeriodicHandshakeIntervalMs() > 0) {
            this.periodicHandshakeTask = this.timer.newTimeout(this, this.clientConfig.getPeriodicHandshakeIntervalMs(), TimeUnit.MILLISECONDS);
        }
    }

    void setPeriodicHandshakeEnabled(boolean z) {
        this.periodicHandshakeEnabled = z;
    }

    public void run(Timeout timeout) throws Exception {
        if (timeout.isCancelled() || this.closed) {
            return;
        }
        if (this.periodicHandshakeEnabled) {
            final boolean z = this.lastOwnershipSyncStopwatch.elapsed(TimeUnit.MILLISECONDS) >= this.clientConfig.getPeriodicOwnershipSyncIntervalMs();
            final Set<SocketAddress> hosts = this.hostProvider.getHosts();
            final AtomicInteger atomicInteger = new AtomicInteger(hosts.size());
            final AtomicInteger atomicInteger2 = new AtomicInteger(0);
            final AtomicInteger atomicInteger3 = new AtomicInteger(0);
            final AtomicInteger atomicInteger4 = new AtomicInteger(0);
            final ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
            final Stopwatch createStarted = Stopwatch.createStarted();
            for (final SocketAddress socketAddress : hosts) {
                final ProxyClient client = getClient(socketAddress);
                handshake(socketAddress, client, new FutureEventListener<ServerInfo>() { // from class: org.apache.distributedlog.client.proxy.ProxyClientManager.1
                    public void onSuccess(ServerInfo serverInfo) {
                        atomicInteger2.addAndGet(serverInfo.getOwnershipsSize());
                        atomicInteger3.incrementAndGet();
                        ProxyClientManager.this.notifyHandshakeSuccess(socketAddress, client, serverInfo, false, createStarted);
                        if (ProxyClientManager.this.clientConfig.isHandshakeTracingEnabled()) {
                            concurrentHashMap.putIfAbsent(socketAddress, Integer.valueOf(serverInfo.getOwnershipsSize()));
                        }
                        complete();
                    }

                    public void onFailure(Throwable th) {
                        atomicInteger4.incrementAndGet();
                        ProxyClientManager.this.notifyHandshakeFailure(socketAddress, client, th, createStarted);
                        complete();
                    }

                    private void complete() {
                        if (0 == atomicInteger.decrementAndGet() && z) {
                            ProxyClientManager.logger.info("Periodic handshaked with {} hosts : {} streams returned, {} hosts succeeded, {} hosts failed", new Object[]{Integer.valueOf(hosts.size()), Integer.valueOf(atomicInteger2.get()), Integer.valueOf(atomicInteger3.get()), Integer.valueOf(atomicInteger4.get())});
                            if (ProxyClientManager.this.clientConfig.isHandshakeTracingEnabled()) {
                                ProxyClientManager.logger.info("Periodic handshaked stream distribution : {}", concurrentHashMap);
                            }
                        }
                    }
                }, false, z);
            }
            if (z) {
                this.lastOwnershipSyncStopwatch.reset().start();
            }
        }
        scheduleHandshake();
    }

    public void registerProxyListener(ProxyListener proxyListener) {
        this.proxyListeners.add(proxyListener);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifyHandshakeSuccess(SocketAddress socketAddress, ProxyClient proxyClient, ServerInfo serverInfo, boolean z, Stopwatch stopwatch) {
        if (z) {
            if (null == serverInfo || !serverInfo.isSetOwnerships()) {
                logger.info("Handshaked with {} : no ownerships returned", socketAddress);
            } else {
                logger.info("Handshaked with {} : {} ownerships returned.", socketAddress, Integer.valueOf(serverInfo.getOwnerships().size()));
            }
        }
        this.handshakeStats.completeRequest(socketAddress, stopwatch.elapsed(TimeUnit.MICROSECONDS), 1);
        Iterator<ProxyListener> it = this.proxyListeners.iterator();
        while (it.hasNext()) {
            it.next().onHandshakeSuccess(socketAddress, proxyClient, serverInfo);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifyHandshakeFailure(SocketAddress socketAddress, ProxyClient proxyClient, Throwable th, Stopwatch stopwatch) {
        this.handshakeStats.failRequest(socketAddress, stopwatch.elapsed(TimeUnit.MICROSECONDS), 1);
        Iterator<ProxyListener> it = this.proxyListeners.iterator();
        while (it.hasNext()) {
            it.next().onHandshakeFailure(socketAddress, proxyClient, th);
        }
    }

    public ProxyClient getClient(SocketAddress socketAddress) {
        ProxyClient proxyClient = this.address2Services.get(socketAddress);
        return null != proxyClient ? proxyClient : createClient(socketAddress);
    }

    public void removeClient(SocketAddress socketAddress) {
        ProxyClient remove = this.address2Services.remove(socketAddress);
        if (null != remove) {
            logger.info("Removed host {}.", socketAddress);
            remove.close();
        }
    }

    public void removeClient(SocketAddress socketAddress, ProxyClient proxyClient) {
        if (this.address2Services.remove(socketAddress, proxyClient)) {
            logger.info("Remove client {} to host {}.", proxyClient, socketAddress);
            proxyClient.close();
        }
    }

    public ProxyClient createClient(final SocketAddress socketAddress) {
        final ProxyClient build = this.clientBuilder.build(socketAddress);
        ProxyClient putIfAbsent = this.address2Services.putIfAbsent(socketAddress, build);
        if (null != putIfAbsent) {
            build.close();
            return putIfAbsent;
        }
        final Stopwatch createStarted = Stopwatch.createStarted();
        handshake(socketAddress, build, new FutureEventListener<ServerInfo>() { // from class: org.apache.distributedlog.client.proxy.ProxyClientManager.2
            public void onSuccess(ServerInfo serverInfo) {
                ProxyClientManager.this.notifyHandshakeSuccess(socketAddress, build, serverInfo, true, createStarted);
            }

            public void onFailure(Throwable th) {
                ProxyClientManager.this.notifyHandshakeFailure(socketAddress, build, th, createStarted);
            }
        }, true, true);
        return build;
    }

    private void handshake(SocketAddress socketAddress, ProxyClient proxyClient, FutureEventListener<ServerInfo> futureEventListener, boolean z, boolean z2) {
        if (!this.clientConfig.getHandshakeWithClientInfo()) {
            if (z) {
                logger.info("Handshaking with {}", socketAddress);
            }
            proxyClient.getService().handshake().addEventListener(futureEventListener);
        } else {
            ClientInfo clientInfo = new ClientInfo();
            clientInfo.setGetOwnerships(z2);
            clientInfo.setStreamNameRegex(this.clientConfig.getStreamNameRegex());
            if (z) {
                logger.info("Handshaking with {} : {}", socketAddress, clientInfo);
            }
            proxyClient.getService().handshakeWithClientInfo(clientInfo).addEventListener(futureEventListener);
        }
    }

    public void handshake() {
        Set<SocketAddress> hosts = this.hostProvider.getHosts();
        logger.info("Handshaking with {} hosts.", Integer.valueOf(hosts.size()));
        final CountDownLatch countDownLatch = new CountDownLatch(hosts.size());
        final Stopwatch createStarted = Stopwatch.createStarted();
        for (final SocketAddress socketAddress : hosts) {
            final ProxyClient client = getClient(socketAddress);
            handshake(socketAddress, client, new FutureEventListener<ServerInfo>() { // from class: org.apache.distributedlog.client.proxy.ProxyClientManager.3
                public void onSuccess(ServerInfo serverInfo) {
                    ProxyClientManager.this.notifyHandshakeSuccess(socketAddress, client, serverInfo, true, createStarted);
                    countDownLatch.countDown();
                }

                public void onFailure(Throwable th) {
                    ProxyClientManager.this.notifyHandshakeFailure(socketAddress, client, th, createStarted);
                    countDownLatch.countDown();
                }
            }, true, true);
        }
        try {
            countDownLatch.await(1L, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            logger.warn("Interrupted on handshaking with servers : ", e);
        }
    }

    public int getNumProxies() {
        return this.address2Services.size();
    }

    public Map<SocketAddress, ProxyClient> getAllClients() {
        return ImmutableMap.copyOf(this.address2Services);
    }

    public void close() {
        this.closed = true;
        Timeout timeout = this.periodicHandshakeTask;
        if (null != timeout) {
            timeout.cancel();
        }
        Iterator<ProxyClient> it = this.address2Services.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
    }
}
