package org.apache.twill.discovery;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.hash.Hashing;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import org.apache.twill.zookeeper.NodeChildren;
import org.apache.twill.zookeeper.NodeData;
import org.apache.twill.zookeeper.OperationFuture;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.twill.zookeeper.ZKClients;
import org.apache.twill.zookeeper.ZKOperations;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/twill/discovery/ZKDiscoveryService.class */
public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceClient {
    private static final Logger LOG = LoggerFactory.getLogger(ZKDiscoveryService.class);
    private static final String NAMESPACE = "/discoverable";
    private static final long RETRY_MILLIS = 1000;
    private final Multimap<Discoverable, DiscoveryCancellable> discoverables;
    private final Lock lock;
    private final LoadingCache<String, ServiceDiscovered> services;
    private final ZKClient zkClient;
    private final ScheduledExecutorService retryExecutor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/twill/discovery/ZKDiscoveryService$DiscoveryCancellable.class */
    public final class DiscoveryCancellable implements Cancellable {
        private final Discoverable discoverable;
        private final AtomicBoolean cancelled = new AtomicBoolean();
        private volatile String path;

        DiscoveryCancellable(Discoverable discoverable) {
            this.discoverable = discoverable;
        }

        void setPath(String str) {
            this.path = str;
            if (!this.cancelled.get() || str == null) {
                return;
            }
            ZKDiscoveryService.this.zkClient.delete(str);
        }

        public void cancel() {
            String str;
            if (this.cancelled.compareAndSet(false, true) && (str = this.path) != null) {
                ZKDiscoveryService.this.lock.lock();
                try {
                    ZKDiscoveryService.this.discoverables.remove(this.discoverable, this);
                    ZKDiscoveryService.this.lock.unlock();
                    Futures.getUnchecked(ZKOperations.ignoreError(ZKDiscoveryService.this.zkClient.delete(str), KeeperException.NoNodeException.class, str));
                    ZKDiscoveryService.LOG.debug("Service unregistered: {} {}", this.discoverable, str);
                } catch (Throwable th) {
                    ZKDiscoveryService.this.lock.unlock();
                    throw th;
                }
            }
        }
    }

    public ZKDiscoveryService(ZKClient zKClient) {
        this(zKClient, NAMESPACE);
    }

    public ZKDiscoveryService(ZKClient zKClient, String str) {
        this.discoverables = HashMultimap.create();
        this.lock = new ReentrantLock();
        this.retryExecutor = Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory("zk-discovery-retry"));
        this.zkClient = str == null ? zKClient : ZKClients.namespace(zKClient, str);
        this.services = CacheBuilder.newBuilder().build(createServiceLoader());
        this.zkClient.addConnectionWatcher(createConnectionWatcher());
    }

    public Cancellable register(final Discoverable discoverable) {
        final DiscoverableWrapper discoverableWrapper = new DiscoverableWrapper(discoverable);
        final SettableFuture create = SettableFuture.create();
        final DiscoveryCancellable discoveryCancellable = new DiscoveryCancellable(discoverableWrapper);
        Futures.addCallback(doRegister(discoverableWrapper), new FutureCallback<String>() { // from class: org.apache.twill.discovery.ZKDiscoveryService.1
            public void onSuccess(String str) {
                discoveryCancellable.setPath(str);
                ZKDiscoveryService.this.lock.lock();
                try {
                    ZKDiscoveryService.this.discoverables.put(discoverableWrapper, discoveryCancellable);
                    ZKDiscoveryService.this.lock.unlock();
                    ZKDiscoveryService.LOG.debug("Service registered: {} {}", discoverableWrapper, str);
                    create.set(str);
                } catch (Throwable th) {
                    ZKDiscoveryService.this.lock.unlock();
                    throw th;
                }
            }

            public void onFailure(Throwable th) {
                if (th instanceof KeeperException.NodeExistsException) {
                    ZKDiscoveryService.this.handleRegisterFailure(discoverable, create, this, th);
                } else {
                    ZKDiscoveryService.LOG.warn("Failed to register: {}", discoverableWrapper, th);
                    create.setException(th);
                }
            }
        }, Threads.SAME_THREAD_EXECUTOR);
        Futures.getUnchecked(create);
        return discoveryCancellable;
    }

    public ServiceDiscovered discover(String str) {
        return (ServiceDiscovered) this.services.getUnchecked(str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleRegisterFailure(final Discoverable discoverable, final SettableFuture<String> settableFuture, final FutureCallback<String> futureCallback, final Throwable th) {
        final String nodePath = getNodePath(discoverable);
        Futures.addCallback(this.zkClient.exists(nodePath), new FutureCallback<Stat>() { // from class: org.apache.twill.discovery.ZKDiscoveryService.2
            public void onSuccess(Stat stat) {
                if (stat == null) {
                    ZKDiscoveryService.LOG.info("Node {} is gone. Retry registration for {}.", nodePath, discoverable);
                    ZKDiscoveryService.this.retryRegister(discoverable, futureCallback);
                    return;
                }
                long ephemeralOwner = stat.getEphemeralOwner();
                if (ephemeralOwner == 0) {
                    ZKDiscoveryService.LOG.error("Node {} already exists and is not an ephemeral node. Discoverable registration failed: {}.", nodePath, discoverable);
                    settableFuture.setException(th);
                    return;
                }
                Long sessionId = ZKDiscoveryService.this.zkClient.getSessionId();
                if (sessionId != null && ephemeralOwner == sessionId.longValue()) {
                    settableFuture.set(nodePath);
                } else {
                    ZKDiscoveryService.LOG.info("Owner of {} is different. Retry registration for {}.", nodePath, discoverable);
                    ZKDiscoveryService.this.retryRegister(discoverable, futureCallback);
                }
            }

            public void onFailure(Throwable th2) {
                ZKDiscoveryService.LOG.warn("Error when getting stats on {}. Retry registration for {}.", nodePath, discoverable);
                ZKDiscoveryService.this.retryRegister(discoverable, futureCallback);
            }
        }, Threads.SAME_THREAD_EXECUTOR);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public OperationFuture<String> doRegister(Discoverable discoverable) {
        return this.zkClient.create(getNodePath(discoverable), DiscoverableAdapter.encode(discoverable), CreateMode.EPHEMERAL, true);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void retryRegister(final Discoverable discoverable, final FutureCallback<String> futureCallback) {
        this.retryExecutor.schedule(new Runnable() { // from class: org.apache.twill.discovery.ZKDiscoveryService.3
            @Override // java.lang.Runnable
            public void run() {
                Futures.addCallback(ZKDiscoveryService.this.doRegister(discoverable), futureCallback, Threads.SAME_THREAD_EXECUTOR);
            }
        }, RETRY_MILLIS, TimeUnit.MILLISECONDS);
    }

    private String getNodePath(Discoverable discoverable) {
        InetSocketAddress socketAddress = discoverable.getSocketAddress();
        return String.format("/%s/%s", discoverable.getName(), Hashing.md5().newHasher().putBytes(socketAddress.getAddress().getAddress()).putInt(socketAddress.getPort()).hash().toString());
    }

    private Watcher createConnectionWatcher() {
        return new Watcher() { // from class: org.apache.twill.discovery.ZKDiscoveryService.4
            private boolean expired;

            public void process(WatchedEvent watchedEvent) {
                if (watchedEvent.getState() == Watcher.Event.KeeperState.Expired) {
                    ZKDiscoveryService.LOG.warn("ZK Session expired: {}", ZKDiscoveryService.this.zkClient.getConnectString());
                    this.expired = true;
                    return;
                }
                if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected && this.expired) {
                    ZKDiscoveryService.LOG.info("Reconnected after expiration: {}", ZKDiscoveryService.this.zkClient.getConnectString());
                    this.expired = false;
                    ZKDiscoveryService.this.lock.lock();
                    try {
                        for (final Map.Entry entry : ZKDiscoveryService.this.discoverables.entries()) {
                            ZKDiscoveryService.LOG.info("Re-registering service: {}", entry.getKey());
                            Futures.addCallback(ZKDiscoveryService.this.doRegister((Discoverable) entry.getKey()), new FutureCallback<String>() { // from class: org.apache.twill.discovery.ZKDiscoveryService.4.1
                                public void onSuccess(String str) {
                                    ((DiscoveryCancellable) entry.getValue()).setPath(str);
                                    ZKDiscoveryService.LOG.debug("Service re-registered: {} {}", entry.getKey(), str);
                                }

                                public void onFailure(Throwable th) {
                                    ((DiscoveryCancellable) entry.getValue()).setPath(null);
                                    ZKDiscoveryService.LOG.error("Failed to re-register service: {}", entry.getKey(), th);
                                }
                            }, Threads.SAME_THREAD_EXECUTOR);
                        }
                    } finally {
                        ZKDiscoveryService.this.lock.unlock();
                    }
                }
            }
        };
    }

    private CacheLoader<String, ServiceDiscovered> createServiceLoader() {
        return new CacheLoader<String, ServiceDiscovered>() { // from class: org.apache.twill.discovery.ZKDiscoveryService.5
            public ServiceDiscovered load(String str) throws Exception {
                final DefaultServiceDiscovered defaultServiceDiscovered = new DefaultServiceDiscovered(str);
                final String str2 = "/" + str;
                ZKOperations.watchChildren(ZKDiscoveryService.this.zkClient, str2, new ZKOperations.ChildrenCallback() { // from class: org.apache.twill.discovery.ZKDiscoveryService.5.1
                    public void updated(NodeChildren nodeChildren) {
                        List children = nodeChildren.getChildren();
                        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(children.size());
                        Iterator it = children.iterator();
                        while (it.hasNext()) {
                            newArrayListWithCapacity.add(ZKDiscoveryService.this.zkClient.getData(str2 + "/" + ((String) it.next())));
                        }
                        final ListenableFuture successfulAsList = Futures.successfulAsList(newArrayListWithCapacity);
                        successfulAsList.addListener(new Runnable() { // from class: org.apache.twill.discovery.ZKDiscoveryService.5.1.1
                            @Override // java.lang.Runnable
                            public void run() {
                                Discoverable decode;
                                ImmutableSet.Builder builder = ImmutableSet.builder();
                                for (NodeData nodeData : (List) Futures.getUnchecked(successfulAsList)) {
                                    if (nodeData != null && (decode = DiscoverableAdapter.decode(nodeData.getData())) != null) {
                                        builder.add(decode);
                                    }
                                }
                                defaultServiceDiscovered.setDiscoverables(builder.build());
                            }
                        }, Threads.SAME_THREAD_EXECUTOR);
                    }
                });
                return defaultServiceDiscovered;
            }
        };
    }
}
