/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.metadata.impl;

import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
import org.apache.pulsar.shade.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.pulsar.shade.com.fasterxml.jackson.databind.type.TypeFactory;
import org.apache.pulsar.shade.com.github.benmanes.caffeine.cache.AsyncCacheLoader;
import org.apache.pulsar.shade.com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import org.apache.pulsar.shade.com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractMetadataStore
implements MetadataStoreExtended,
Consumer<Notification> {
    private static final Logger log = LoggerFactory.getLogger(AbstractMetadataStore.class);
    private static final long CACHE_REFRESH_TIME_MILLIS = TimeUnit.MINUTES.toMillis(5L);
    private final CopyOnWriteArrayList<Consumer<Notification>> listeners = new CopyOnWriteArrayList();
    private final CopyOnWriteArrayList<Consumer<SessionEvent>> sessionListeners = new CopyOnWriteArrayList();
    private final ExecutorService executor;
    private final AsyncLoadingCache<String, List<String>> childrenCache;
    private final AsyncLoadingCache<String, Boolean> existsCache;
    private final CopyOnWriteArrayList<MetadataCacheImpl<?>> metadataCaches = new CopyOnWriteArrayList();
    private boolean isConnected = true;

    protected abstract CompletableFuture<List<String>> getChildrenFromStore(String var1);

    protected abstract CompletableFuture<Boolean> existsFromStore(String var1);

    protected AbstractMetadataStore() {
        this.executor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("metadata-store"));
        this.registerListener(this);
        this.childrenCache = Caffeine.newBuilder().refreshAfterWrite(CACHE_REFRESH_TIME_MILLIS, TimeUnit.MILLISECONDS).buildAsync(new AsyncCacheLoader<String, List<String>>(){

            @Override
            public CompletableFuture<List<String>> asyncLoad(String key, Executor executor) {
                return AbstractMetadataStore.this.getChildrenFromStore(key);
            }

            @Override
            public CompletableFuture<List<String>> asyncReload(String key, List<String> oldValue, Executor executor) {
                if (AbstractMetadataStore.this.isConnected) {
                    return AbstractMetadataStore.this.getChildrenFromStore(key);
                }
                return CompletableFuture.completedFuture(oldValue);
            }
        });
        this.existsCache = Caffeine.newBuilder().refreshAfterWrite(CACHE_REFRESH_TIME_MILLIS, TimeUnit.MILLISECONDS).buildAsync(new AsyncCacheLoader<String, Boolean>(){

            @Override
            public CompletableFuture<Boolean> asyncLoad(String key, Executor executor) {
                return AbstractMetadataStore.this.existsFromStore(key);
            }

            @Override
            public CompletableFuture<Boolean> asyncReload(String key, Boolean oldValue, Executor executor) {
                if (AbstractMetadataStore.this.isConnected) {
                    return AbstractMetadataStore.this.existsFromStore(key);
                }
                return CompletableFuture.completedFuture(oldValue);
            }
        });
    }

    @Override
    public <T> MetadataCache<T> getMetadataCache(Class<T> clazz) {
        MetadataCacheImpl metadataCache = new MetadataCacheImpl((MetadataStore)this, TypeFactory.defaultInstance().constructSimpleType(clazz, null));
        this.metadataCaches.add(metadataCache);
        return metadataCache;
    }

    @Override
    public <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef) {
        MetadataCacheImpl<T> metadataCache = new MetadataCacheImpl<T>((MetadataStore)this, typeRef);
        this.metadataCaches.add(metadataCache);
        return metadataCache;
    }

    @Override
    public <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde) {
        MetadataCacheImpl<T> metadataCache = new MetadataCacheImpl<T>((MetadataStore)this, serde);
        this.metadataCaches.add(metadataCache);
        return metadataCache;
    }

    @Override
    public final CompletableFuture<List<String>> getChildren(String path) {
        if (!AbstractMetadataStore.isValidPath(path)) {
            return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path));
        }
        return this.childrenCache.get(path);
    }

    @Override
    public final CompletableFuture<Boolean> exists(String path) {
        if (!AbstractMetadataStore.isValidPath(path)) {
            return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path));
        }
        return this.existsCache.get(path);
    }

    @Override
    public void registerListener(Consumer<Notification> listener) {
        this.listeners.add(listener);
    }

    protected CompletableFuture<Void> receivedNotification(Notification notification) {
        try {
            return CompletableFuture.supplyAsync(() -> {
                this.listeners.forEach((Consumer<Consumer<Notification>>)((Consumer<Consumer>)listener -> {
                    try {
                        listener.accept(notification);
                    }
                    catch (Throwable t) {
                        log.error("Failed to process metadata store notification", t);
                    }
                }));
                return null;
            }, this.executor);
        }
        catch (RejectedExecutionException e) {
            return FutureUtil.failedFuture(e);
        }
    }

    @Override
    public void accept(Notification n) {
        String path = n.getPath();
        NotificationType type = n.getType();
        if (type == NotificationType.Created || type == NotificationType.Deleted) {
            this.existsCache.synchronous().invalidate(path);
            String parent = AbstractMetadataStore.parent(path);
            if (parent != null) {
                this.childrenCache.synchronous().invalidate(parent);
            }
        }
        if (type == NotificationType.ChildrenChanged) {
            this.childrenCache.synchronous().invalidate(path);
        }
        if (type == NotificationType.Created || type == NotificationType.Deleted || type == NotificationType.Modified) {
            this.metadataCaches.forEach((Consumer<MetadataCacheImpl<?>>)((Consumer<MetadataCacheImpl>)c -> c.accept(n)));
        }
    }

    protected abstract CompletableFuture<Void> storeDelete(String var1, Optional<Long> var2);

    @Override
    public final CompletableFuture<Void> delete(String path, Optional<Long> expectedVersion) {
        if (!AbstractMetadataStore.isValidPath(path)) {
            return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path));
        }
        return this.storeDelete(path, expectedVersion).thenRun(() -> {
            this.existsCache.synchronous().invalidate(path);
            String parent = AbstractMetadataStore.parent(path);
            if (parent != null) {
                this.childrenCache.synchronous().invalidate(parent);
            }
            this.metadataCaches.forEach((Consumer<MetadataCacheImpl<?>>)((Consumer<MetadataCacheImpl>)c -> c.invalidate(path)));
        });
    }

    @Override
    public CompletableFuture<Void> deleteRecursive(String path) {
        return ((CompletableFuture)((CompletableFuture)this.getChildren(path).thenCompose(children -> FutureUtil.waitForAll(children.stream().map(child -> this.deleteRecursive(path + "/" + child)).collect(Collectors.toList())))).thenCompose(__ -> this.exists(path))).thenCompose(exists -> {
            if (exists.booleanValue()) {
                return this.delete(path, Optional.empty());
            }
            return CompletableFuture.completedFuture(null);
        });
    }

    protected abstract CompletableFuture<Stat> storePut(String var1, byte[] var2, Optional<Long> var3, EnumSet<CreateOption> var4);

    @Override
    public final CompletableFuture<Stat> put(String path, byte[] data, Optional<Long> optExpectedVersion, EnumSet<CreateOption> options) {
        if (!AbstractMetadataStore.isValidPath(path)) {
            return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path));
        }
        return this.storePut(path, data, optExpectedVersion, options).thenApply(stat -> {
            NotificationType type;
            NotificationType notificationType = type = stat.getVersion() == 0L ? NotificationType.Created : NotificationType.Modified;
            if (type == NotificationType.Created) {
                this.existsCache.synchronous().invalidate(path);
                String parent = AbstractMetadataStore.parent(path);
                if (parent != null) {
                    this.childrenCache.synchronous().invalidate(parent);
                }
            }
            this.metadataCaches.forEach((Consumer<MetadataCacheImpl<?>>)((Consumer<MetadataCacheImpl>)c -> c.invalidate(path)));
            return stat;
        });
    }

    @Override
    public void registerSessionListener(Consumer<SessionEvent> listener) {
        this.sessionListeners.add(listener);
    }

    protected void receivedSessionEvent(SessionEvent event) {
        this.isConnected = event.isConnected();
        this.sessionListeners.forEach((Consumer<Consumer<SessionEvent>>)((Consumer<Consumer>)l -> {
            try {
                l.accept(event);
            }
            catch (Throwable t) {
                log.warn("Error in processing session event", t);
            }
        }));
    }

    @Override
    public void close() throws Exception {
        this.executor.shutdownNow();
        this.executor.awaitTermination(10L, TimeUnit.SECONDS);
    }

    @VisibleForTesting
    public void invalidateAll() {
        this.childrenCache.synchronous().invalidateAll();
        this.existsCache.synchronous().invalidateAll();
    }

    protected void execute(Runnable task, CompletableFuture<?> future) {
        try {
            this.executor.execute(task);
        }
        catch (Throwable t) {
            future.completeExceptionally(t);
        }
    }

    protected static String parent(String path) {
        int idx = path.lastIndexOf(47);
        if (idx <= 0) {
            return null;
        }
        return path.substring(0, idx);
    }

    static boolean isValidPath(String path) {
        return StringUtils.equals(path, "/") || StringUtils.isNotBlank(path) && path.startsWith("/") && !path.endsWith("/");
    }

    public boolean isConnected() {
        return this.isConnected;
    }
}

