package org.apache.pulsar.zookeeper;

import com.google.common.collect.Lists;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/zookeeper/ZooKeeperDataCache.class */
public abstract class ZooKeeperDataCache<T> implements ZooKeeperCache.Deserializer<T>, ZooKeeperCache.CacheUpdater<T>, Watcher {
    private final ZooKeeperCache cache;
    private static final int FALSE = 0;
    private static final int TRUE = 1;
    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperDataCache.class);
    private static final AtomicIntegerFieldUpdater<ZooKeeperDataCache> IS_SHUTDOWN_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ZooKeeperDataCache.class, "isShutdown");
    private final List<ZooKeeperCacheListener<T>> listeners = Lists.newCopyOnWriteArrayList();
    private volatile int isShutdown = FALSE;

    public ZooKeeperDataCache(ZooKeeperCache zooKeeperCache) {
        this.cache = zooKeeperCache;
    }

    public CompletableFuture<Optional<T>> getAsync(String str) {
        CompletableFuture<Optional<T>> completableFuture = new CompletableFuture<>();
        this.cache.getDataAsync(str, this, this).thenAccept(optional -> {
            completableFuture.complete(optional.map((v0) -> {
                return v0.getKey();
            }));
        }).exceptionally(th -> {
            this.cache.asyncInvalidate(str);
            completableFuture.completeExceptionally(th);
            return null;
        });
        return completableFuture;
    }

    public CompletableFuture<Optional<Map.Entry<T, Stat>>> getWithStatAsync(String str) {
        return this.cache.getDataAsync(str, this, this).whenComplete((optional, th) -> {
            if (th != null) {
                this.cache.asyncInvalidate(str);
            }
        });
    }

    public Optional<T> get(String str) throws Exception {
        return getAsync(str).get();
    }

    public Optional<Map.Entry<T, Stat>> getWithStat(String str) throws Exception {
        return this.cache.getData(str, this, this);
    }

    public void clear() {
        this.cache.invalidateAllData();
    }

    public void invalidate(String str) {
        this.cache.invalidateData(str);
    }

    @Override // org.apache.pulsar.zookeeper.ZooKeeperCache.CacheUpdater
    public void reloadCache(String str) {
        try {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Reloading ZooKeeperDataCache at path {}", str);
            }
            this.cache.invalidate(str);
            Optional<Map.Entry<T, Stat>> data = this.cache.getData(str, this, this);
            if (!data.isPresent()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Node [{}] does not exist", str);
                    return;
                }
                return;
            }
            for (ZooKeeperCacheListener<T> zooKeeperCacheListener : this.listeners) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Notifying listener {} at path {}", zooKeeperCacheListener, str);
                }
                zooKeeperCacheListener.onUpdate(str, data.get().getKey(), data.get().getValue());
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Notified listener {} at path {}", zooKeeperCacheListener, str);
                }
            }
        } catch (Exception e) {
            LOG.warn("Reloading ZooKeeperDataCache failed at path: {}", str, e);
        }
    }

    @Override // org.apache.pulsar.zookeeper.ZooKeeperCache.CacheUpdater
    public void registerListener(ZooKeeperCacheListener<T> zooKeeperCacheListener) {
        this.listeners.add(zooKeeperCacheListener);
    }

    @Override // org.apache.pulsar.zookeeper.ZooKeeperCache.CacheUpdater
    public void unregisterListener(ZooKeeperCacheListener<T> zooKeeperCacheListener) {
        this.listeners.remove(zooKeeperCacheListener);
    }

    public void process(WatchedEvent watchedEvent) {
        LOG.info("[{}] Received ZooKeeper watch event: {}", this.cache.zkSession.get(), watchedEvent);
        if (IS_SHUTDOWN_UPDATER.get(this) == 0) {
            this.cache.process(watchedEvent, this);
        }
    }

    public void close() {
        IS_SHUTDOWN_UPDATER.set(this, TRUE);
    }
}
