package com.linkedin.d2.balancer.zkfs;

import com.linkedin.common.callback.Callback;
import com.linkedin.common.util.None;
import com.linkedin.d2.balancer.Directory;
import com.linkedin.d2.balancer.Facilities;
import com.linkedin.d2.balancer.KeyMapper;
import com.linkedin.d2.balancer.LoadBalancer;
import com.linkedin.d2.balancer.LoadBalancerWithFacilities;
import com.linkedin.d2.balancer.ServiceUnavailableException;
import com.linkedin.d2.balancer.properties.ServiceProperties;
import com.linkedin.d2.balancer.util.ClientFactoryProvider;
import com.linkedin.d2.balancer.util.DirectoryProvider;
import com.linkedin.d2.balancer.util.HostToKeyMapper;
import com.linkedin.d2.balancer.util.KeyMapperProvider;
import com.linkedin.d2.balancer.util.MapKeyResult;
import com.linkedin.d2.balancer.util.TogglingLoadBalancer;
import com.linkedin.d2.balancer.util.hashing.ConsistentHashKeyMapper;
import com.linkedin.d2.balancer.util.hashing.HashRingProvider;
import com.linkedin.d2.balancer.util.hashing.Ring;
import com.linkedin.d2.balancer.util.partitions.PartitionAccessor;
import com.linkedin.d2.balancer.util.partitions.PartitionInfoProvider;
import com.linkedin.d2.discovery.event.PropertyEventThread;
import com.linkedin.d2.discovery.stores.zk.ZKConnection;
import com.linkedin.r2.message.Request;
import com.linkedin.r2.message.RequestContext;
import com.linkedin.r2.transport.common.TransportClientFactory;
import com.linkedin.r2.transport.common.bridge.client.TransportClient;
import com.linkedin.r2.util.NamedThreadFactory;
import java.io.File;
import java.net.URI;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/d2-11.0.0.jar:com/linkedin/d2/balancer/zkfs/ZKFSLoadBalancer.class */
public class ZKFSLoadBalancer implements LoadBalancerWithFacilities, DirectoryProvider, KeyMapperProvider, HashRingProvider, PartitionInfoProvider, ClientFactoryProvider {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ZKFSLoadBalancer.class);
    private final String _connectString;
    private final int _sessionTimeout;
    private final int _initialZKTimeout;
    private final boolean _shutdownAsynchronously;
    private final boolean _isSymlinkAware;
    private final AtomicReference<Callback<None>> _startupCallback;
    private final TogglingLoadBalancerFactory _loadBalancerFactory;
    private final File _zkFlagFile;
    private final ZKFSDirectory _directory;
    private volatile long _delayedExecution;
    private final ScheduledExecutorService _executor;
    private final KeyMapper _keyMapper;
    private volatile ZKConnection _zkConnection;
    private volatile LoadBalancer _currentLoadBalancer;

    /* loaded from: input_file:WEB-INF/lib/d2-11.0.0.jar:com/linkedin/d2/balancer/zkfs/ZKFSLoadBalancer$TogglingLoadBalancerFactory.class */
    public interface TogglingLoadBalancerFactory {
        TogglingLoadBalancer createLoadBalancer(ZKConnection zKConnection, ScheduledExecutorService scheduledExecutorService);
    }

    /* loaded from: input_file:WEB-INF/lib/d2-11.0.0.jar:com/linkedin/d2/balancer/zkfs/ZKFSLoadBalancer$ZKListener.class */
    private class ZKListener implements ZKConnection.StateListener {
        private final TogglingLoadBalancer _balancer;

        private ZKListener(TogglingLoadBalancer togglingLoadBalancer) {
            this._balancer = togglingLoadBalancer;
        }

        @Override // com.linkedin.d2.discovery.stores.zk.ZKConnection.StateListener
        public void notifyStateChange(final Watcher.Event.KeeperState keeperState) {
            ZKFSLoadBalancer.LOG.info("ZooKeeper session {} received KeeperState {}", Long.valueOf(ZKFSLoadBalancer.this._zkConnection.getZooKeeper().getSessionId()), keeperState);
            ZKFSLoadBalancer.this._executor.execute(new Runnable() { // from class: com.linkedin.d2.balancer.zkfs.ZKFSLoadBalancer.ZKListener.1
                @Override // java.lang.Runnable
                public void run() {
                    switch (keeperState) {
                        case SyncConnected:
                            ZKFSLoadBalancer.this.processSyncConnected(ZKListener.this._balancer);
                            return;
                        case Disconnected:
                            ZKFSLoadBalancer.LOG.info("Enabling backup stores");
                            ZKListener.this._balancer.enableBackup(ZKFSLoadBalancer.this.getStartupOrLoggerCallback());
                            return;
                        case Expired:
                            ZKFSLoadBalancer.this.reset(ZKListener.this._balancer);
                            return;
                        default:
                            ZKFSLoadBalancer.LOG.info("Ignoring unknown state change {}", keeperState);
                            return;
                    }
                }
            });
        }
    }

    public ZKFSLoadBalancer(String str, int i, int i2, TogglingLoadBalancerFactory togglingLoadBalancerFactory, String str2, String str3) {
        this(str, i, i2, togglingLoadBalancerFactory, str2, str3, false);
    }

    public ZKFSLoadBalancer(String str, int i, int i2, TogglingLoadBalancerFactory togglingLoadBalancerFactory, String str2, String str3, boolean z) {
        this(str, i, i2, togglingLoadBalancerFactory, str2, str3, z, false, Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("D2 PropertyEventExecutor")));
    }

    public ZKFSLoadBalancer(String str, int i, int i2, TogglingLoadBalancerFactory togglingLoadBalancerFactory, String str2, String str3, boolean z, boolean z2, ScheduledExecutorService scheduledExecutorService) {
        this._startupCallback = new AtomicReference<>();
        this._connectString = str;
        this._sessionTimeout = i;
        this._initialZKTimeout = i2;
        this._loadBalancerFactory = togglingLoadBalancerFactory;
        if (str2 == null) {
            this._zkFlagFile = null;
        } else {
            this._zkFlagFile = new File(str2);
        }
        this._directory = new ZKFSDirectory(str3);
        this._shutdownAsynchronously = z;
        this._isSymlinkAware = z2;
        this._executor = scheduledExecutorService;
        this._keyMapper = new ConsistentHashKeyMapper(this, this);
        this._delayedExecution = 1000L;
    }

    public long getDelayedExecution() {
        return this._delayedExecution;
    }

    public void setDelayedExecution(long j) {
        this._delayedExecution = j;
    }

    @Override // com.linkedin.d2.balancer.LoadBalancer
    public TransportClient getClient(Request request, RequestContext requestContext) throws ServiceUnavailableException {
        return this._currentLoadBalancer.getClient(request, requestContext);
    }

    @Override // com.linkedin.d2.balancer.LoadBalancer
    public void shutdown(final PropertyEventThread.PropertyEventShutdownCallback propertyEventShutdownCallback) {
        LOG.info("Shutting down");
        this._currentLoadBalancer.shutdown(new PropertyEventThread.PropertyEventShutdownCallback() { // from class: com.linkedin.d2.balancer.zkfs.ZKFSLoadBalancer.1
            @Override // com.linkedin.d2.discovery.event.PropertyEventThread.PropertyEventShutdownCallback
            public void done() {
                try {
                    try {
                        ZKFSLoadBalancer.LOG.info("Shutting down ZooKeeper connection");
                        ZKFSLoadBalancer.this._zkConnection.shutdown();
                        ZKFSLoadBalancer.LOG.info("Shutting down PropertyEvent executor");
                        ZKFSLoadBalancer.this._executor.shutdown();
                        propertyEventShutdownCallback.done();
                    } catch (InterruptedException e) {
                        ZKFSLoadBalancer.LOG.warn("Unexpected exception during shutdown", (Throwable) e);
                        Thread.currentThread().interrupt();
                        ZKFSLoadBalancer.LOG.info("Shutting down PropertyEvent executor");
                        ZKFSLoadBalancer.this._executor.shutdown();
                        propertyEventShutdownCallback.done();
                    }
                } catch (Throwable th) {
                    ZKFSLoadBalancer.LOG.info("Shutting down PropertyEvent executor");
                    ZKFSLoadBalancer.this._executor.shutdown();
                    propertyEventShutdownCallback.done();
                    throw th;
                }
            }
        });
    }

    @Override // com.linkedin.d2.balancer.LoadBalancer
    public ServiceProperties getLoadBalancedServiceProperties(String str) throws ServiceUnavailableException {
        if (this._currentLoadBalancer == null) {
            return null;
        }
        return this._currentLoadBalancer.getLoadBalancedServiceProperties(str);
    }

    @Override // com.linkedin.d2.balancer.LoadBalancer
    public void start(final Callback<None> callback) {
        LOG.info("Starting ZKFSLoadBalancer");
        LOG.info("ZK connect string: {}", this._connectString);
        LOG.info("ZK session timeout: {}ms", Integer.valueOf(this._sessionTimeout));
        LOG.info("ZK initial connect timeout: {}ms", Integer.valueOf(this._initialZKTimeout));
        if (this._connectString == null || this._connectString.isEmpty()) {
            callback.onError(new IllegalArgumentException("ZooKeeper connection string is null or empty"));
            return;
        }
        if (this._zkFlagFile == null) {
            LOG.info("ZK flag file not specified");
        } else {
            LOG.info("ZK flag file: {}", this._zkFlagFile.getAbsolutePath());
            LOG.info("ZK currently suppressed by flag file: {}", Boolean.valueOf(suppressZK()));
        }
        this._zkConnection = new ZKConnection(this._connectString, this._sessionTimeout, this._shutdownAsynchronously, this._isSymlinkAware);
        final TogglingLoadBalancer createLoadBalancer = this._loadBalancerFactory.createLoadBalancer(this._zkConnection, this._executor);
        if (this._currentLoadBalancer == null) {
            this._currentLoadBalancer = createLoadBalancer;
        }
        if (!this._startupCallback.compareAndSet(null, new Callback<None>() { // from class: com.linkedin.d2.balancer.zkfs.ZKFSLoadBalancer.2
            @Override // com.linkedin.common.callback.SuccessCallback
            public void onSuccess(None none) {
                ZKFSLoadBalancer.this._currentLoadBalancer = createLoadBalancer;
                callback.onSuccess(none);
            }

            @Override // com.linkedin.common.callback.Callback
            public void onError(Throwable th) {
                callback.onError(th);
            }
        })) {
            throw new IllegalStateException("Startup already in progress");
        }
        this._executor.execute(new PropertyEventThread.PropertyEvent("startup") { // from class: com.linkedin.d2.balancer.zkfs.ZKFSLoadBalancer.3
            @Override // com.linkedin.d2.discovery.event.PropertyEventThread.PropertyEvent
            public void innerRun() {
                ZKFSLoadBalancer.this._zkConnection.addStateListener(new ZKListener(createLoadBalancer));
                try {
                    ZKFSLoadBalancer.this._zkConnection.start();
                    ZKFSLoadBalancer.LOG.info("Started ZooKeeper");
                    ZKFSLoadBalancer.this._executor.schedule(new Runnable() { // from class: com.linkedin.d2.balancer.zkfs.ZKFSLoadBalancer.3.1
                        @Override // java.lang.Runnable
                        public void run() {
                            Callback<None> callback2 = (Callback) ZKFSLoadBalancer.this._startupCallback.getAndSet(null);
                            if (callback2 != null) {
                                ZKFSLoadBalancer.LOG.error("No response from ZooKeeper within {}ms, enabling backup stores", Integer.valueOf(ZKFSLoadBalancer.this._initialZKTimeout));
                                createLoadBalancer.enableBackup(callback2);
                            }
                        }
                    }, ZKFSLoadBalancer.this._initialZKTimeout, TimeUnit.MILLISECONDS);
                } catch (Exception e) {
                    ZKFSLoadBalancer.LOG.error("Failed to start ZooKeeper (bad configuration?), enabling backup stores", (Throwable) e);
                    createLoadBalancer.enableBackup((Callback) ZKFSLoadBalancer.this._startupCallback.getAndSet(null));
                }
            }
        });
    }

    @Override // com.linkedin.d2.balancer.Facilities
    public Directory getDirectory() {
        return this._directory;
    }

    @Override // com.linkedin.d2.balancer.Facilities
    public PartitionInfoProvider getPartitionInfoProvider() {
        return this;
    }

    @Override // com.linkedin.d2.balancer.Facilities
    public KeyMapper getKeyMapper() {
        return this._keyMapper;
    }

    @Override // com.linkedin.d2.balancer.util.hashing.HashRingProvider
    public <K> MapKeyResult<Ring<URI>, K> getRings(URI uri, Iterable<K> iterable) throws ServiceUnavailableException {
        checkLoadBalancer();
        return ((HashRingProvider) this._currentLoadBalancer).getRings(uri, iterable);
    }

    @Override // com.linkedin.d2.balancer.util.hashing.HashRingProvider
    public Map<Integer, Ring<URI>> getRings(URI uri) throws ServiceUnavailableException {
        checkLoadBalancer();
        return ((HashRingProvider) this._currentLoadBalancer).getRings(uri);
    }

    @Override // com.linkedin.d2.balancer.util.partitions.PartitionInfoProvider
    public <K> HostToKeyMapper<K> getPartitionInformation(URI uri, Collection<K> collection, int i, int i2) throws ServiceUnavailableException {
        checkPartitionInfoProvider();
        return ((PartitionInfoProvider) this._currentLoadBalancer).getPartitionInformation(uri, collection, i, i2);
    }

    @Override // com.linkedin.d2.balancer.util.partitions.PartitionInfoProvider
    public PartitionAccessor getPartitionAccessor(URI uri) throws ServiceUnavailableException {
        checkPartitionInfoProvider();
        return ((PartitionInfoProvider) this._currentLoadBalancer).getPartitionAccessor(uri);
    }

    public void checkLoadBalancer() {
        if (this._currentLoadBalancer == null || !(this._currentLoadBalancer instanceof HashRingProvider)) {
            throw new IllegalStateException("No HashRingProvider available to ZKFSLoadBalancer - this could be because the load balancer is not yet initialized, or because it has been configured with strategies that do not support consistent hashing.");
        }
    }

    private void checkPartitionInfoProvider() {
        if (this._currentLoadBalancer == null || !(this._currentLoadBalancer instanceof PartitionInfoProvider)) {
            throw new IllegalStateException("No PartitionInfoProvider available to TogglingLoadBalancer - this could be because the load balancer is not yet initialized, or because it has been configured with strategies that do not support consistent hashing.");
        }
    }

    @Override // com.linkedin.d2.balancer.Facilities
    public TransportClientFactory getClientFactory(String str) {
        if (this._currentLoadBalancer == null || !(this._currentLoadBalancer instanceof ClientFactoryProvider)) {
            throw new IllegalStateException("No ClientFactoryProvider available to ZKFSLoadBalancer - this could be because the load balancer is not yet initialized, or because it has been configured with a LoadBalancer which does notsupport obtaining client factories");
        }
        return ((ClientFactoryProvider) this._currentLoadBalancer).getClientFactory(str);
    }

    public Facilities getFacilities() {
        return this;
    }

    private boolean suppressZK() {
        return this._zkFlagFile != null && this._zkFlagFile.exists();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Callback<None> getStartupOrLoggerCallback() {
        Callback<None> andSet = this._startupCallback.getAndSet(null);
        if (andSet == null) {
            andSet = new Callback<None>() { // from class: com.linkedin.d2.balancer.zkfs.ZKFSLoadBalancer.4
                @Override // com.linkedin.common.callback.SuccessCallback
                public void onSuccess(None none) {
                    ZKFSLoadBalancer.LOG.info("Enabled stores");
                }

                @Override // com.linkedin.common.callback.Callback
                public void onError(Throwable th) {
                    ZKFSLoadBalancer.LOG.error("Failed to enable stores", th);
                }
            };
        }
        return andSet;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processSyncConnected(TogglingLoadBalancer togglingLoadBalancer) {
        if (suppressZK()) {
            LOG.warn("ZooKeeper currently suppressed by flag file {}, enabling backup stores", this._zkFlagFile.getAbsolutePath());
            togglingLoadBalancer.enableBackup(getStartupOrLoggerCallback());
        } else {
            LOG.info("Enabling primary ZK stores");
            this._directory.setConnection(this._zkConnection);
            togglingLoadBalancer.enablePrimary(new Callback<None>() { // from class: com.linkedin.d2.balancer.zkfs.ZKFSLoadBalancer.5
                @Override // com.linkedin.common.callback.SuccessCallback
                public void onSuccess(None none) {
                    ZKFSLoadBalancer.this.getStartupOrLoggerCallback().onSuccess(none);
                }

                @Override // com.linkedin.common.callback.Callback
                public void onError(Throwable th) {
                    ZKFSLoadBalancer.LOG.info("Ignored error enabling primary ZK stores; expecting Disconnected notification", th);
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void reset(final LoadBalancer loadBalancer) {
        LOG.info("Resetting LoadBalancerState");
        Callback<None> andSet = this._startupCallback.getAndSet(null);
        if (andSet != null) {
            andSet.onError(new KeeperException.SessionExpiredException());
        }
        start(new Callback<None>() { // from class: com.linkedin.d2.balancer.zkfs.ZKFSLoadBalancer.6
            @Override // com.linkedin.common.callback.SuccessCallback
            public void onSuccess(None none) {
                ZKFSLoadBalancer.LOG.info("Successfully reset LoadBalancer after ZooKeeper session expiration");
                ZKFSLoadBalancer.this._executor.schedule(new Runnable() { // from class: com.linkedin.d2.balancer.zkfs.ZKFSLoadBalancer.6.1
                    @Override // java.lang.Runnable
                    public void run() {
                        loadBalancer.shutdown(new PropertyEventThread.PropertyEventShutdownCallback() { // from class: com.linkedin.d2.balancer.zkfs.ZKFSLoadBalancer.6.1.1
                            @Override // com.linkedin.d2.discovery.event.PropertyEventThread.PropertyEventShutdownCallback
                            public void done() {
                                ZKFSLoadBalancer.LOG.info("Shut down old LoadBalancer after ZooKeeper session expiration");
                            }
                        });
                    }
                }, ZKFSLoadBalancer.this._delayedExecution, TimeUnit.MILLISECONDS);
            }

            @Override // com.linkedin.common.callback.Callback
            public void onError(Throwable th) {
                ZKFSLoadBalancer.LOG.error("Failed to reset LoadBalancer after ZooKeeper session expiration");
            }
        });
    }

    public void reset() {
        reset(this._currentLoadBalancer);
    }

    ZKConnection zkConnection() {
        return this._zkConnection;
    }
}
