/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.metadata.cache.impl;

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.pulsar.metadata.api.CacheGetResult;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataCacheConfig;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.cache.impl.JSONMetadataSerdeSimpleType;
import org.apache.pulsar.metadata.cache.impl.JSONMetadataSerdeTypeRef;
import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
import org.apache.pulsar.shade.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.pulsar.shade.com.fasterxml.jackson.databind.JavaType;
import org.apache.pulsar.shade.com.github.benmanes.caffeine.cache.AsyncCacheLoader;
import org.apache.pulsar.shade.com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import org.apache.pulsar.shade.com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MetadataCacheImpl<T>
implements MetadataCache<T>,
Consumer<Notification> {
    private static final Logger log = LoggerFactory.getLogger(MetadataCacheImpl.class);
    private final MetadataStore store;
    private final MetadataSerde<T> serde;
    private final AsyncLoadingCache<String, Optional<CacheGetResult<T>>> objCache;

    public MetadataCacheImpl(MetadataStore store, TypeReference<T> typeRef, MetadataCacheConfig cacheConfig) {
        this(store, new JSONMetadataSerdeTypeRef<T>(typeRef), cacheConfig);
    }

    public MetadataCacheImpl(MetadataStore store, JavaType type, MetadataCacheConfig cacheConfig) {
        this(store, new JSONMetadataSerdeSimpleType(type), cacheConfig);
    }

    public MetadataCacheImpl(final MetadataStore store, MetadataSerde<T> serde, MetadataCacheConfig cacheConfig) {
        this.store = store;
        this.serde = serde;
        Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder();
        if (cacheConfig.getRefreshAfterWriteMillis() > 0L) {
            cacheBuilder.refreshAfterWrite(cacheConfig.getRefreshAfterWriteMillis(), TimeUnit.MILLISECONDS);
        }
        if (cacheConfig.getExpireAfterWriteMillis() > 0L) {
            cacheBuilder.expireAfterWrite(cacheConfig.getExpireAfterWriteMillis(), TimeUnit.MILLISECONDS);
        }
        this.objCache = cacheBuilder.buildAsync(new AsyncCacheLoader<String, Optional<CacheGetResult<T>>>(){

            @Override
            public CompletableFuture<Optional<CacheGetResult<T>>> asyncLoad(String key, Executor executor) {
                return MetadataCacheImpl.this.readValueFromStore(key);
            }

            @Override
            public CompletableFuture<Optional<CacheGetResult<T>>> asyncReload(String key, Optional<CacheGetResult<T>> oldValue, Executor executor) {
                if (store instanceof AbstractMetadataStore && ((AbstractMetadataStore)store).isConnected()) {
                    return MetadataCacheImpl.this.readValueFromStore(key);
                }
                return CompletableFuture.completedFuture(oldValue);
            }
        });
    }

    private CompletableFuture<Optional<CacheGetResult<T>>> readValueFromStore(String path) {
        return this.store.get(path).thenCompose(optRes -> {
            if (!optRes.isPresent()) {
                return FutureUtils.value(Optional.empty());
            }
            try {
                GetResult res = (GetResult)optRes.get();
                T obj = this.serde.deserialize(path, res.getValue(), res.getStat());
                return FutureUtils.value(Optional.of(new CacheGetResult<T>(obj, res.getStat())));
            }
            catch (Throwable t) {
                return FutureUtils.exception(new MetadataStoreException.ContentDeserializationException("Failed to deserialize payload for key '" + path + "'", t));
            }
        });
    }

    @Override
    public CompletableFuture<Optional<T>> get(String path) {
        return this.objCache.get(path).thenApply(optRes -> optRes.map(CacheGetResult::getValue));
    }

    @Override
    public CompletableFuture<Optional<CacheGetResult<T>>> getWithStats(String path) {
        return this.objCache.get(path);
    }

    @Override
    public Optional<T> getIfCached(String path) {
        CompletableFuture future = this.objCache.getIfPresent(path);
        if (future != null && future.isDone() && !future.isCompletedExceptionally()) {
            return ((Optional)future.join()).map(CacheGetResult::getValue);
        }
        return Optional.empty();
    }

    @Override
    public CompletableFuture<T> readModifyUpdateOrCreate(String path, Function<Optional<T>, T> modifyFunction) {
        return this.executeWithRetry(() -> this.objCache.get(path).thenCompose(optEntry -> {
            byte[] newValue;
            Object newValueObj;
            long expectedVersion;
            Optional currentValue;
            if (optEntry.isPresent()) {
                T clone;
                CacheGetResult entry = (CacheGetResult)optEntry.get();
                try {
                    clone = this.serde.deserialize(path, this.serde.serialize(path, entry.getValue()), entry.getStat());
                }
                catch (IOException e) {
                    return FutureUtils.exception(e);
                }
                currentValue = Optional.of(clone);
                expectedVersion = entry.getStat().getVersion();
            } else {
                currentValue = Optional.empty();
                expectedVersion = -1L;
            }
            try {
                newValueObj = modifyFunction.apply(currentValue);
                newValue = this.serde.serialize(path, newValueObj);
            }
            catch (Throwable t) {
                return FutureUtils.exception(t);
            }
            return ((CompletableFuture)this.store.put(path, newValue, Optional.of(expectedVersion)).thenAccept(__ -> this.refresh(path))).thenApply(__ -> newValueObj);
        }), path);
    }

    @Override
    public CompletableFuture<T> readModifyUpdate(String path, Function<T, T> modifyFunction) {
        return this.executeWithRetry(() -> this.objCache.get(path).thenCompose(optEntry -> {
            byte[] newValue;
            Object newValueObj;
            if (!optEntry.isPresent()) {
                return FutureUtils.exception(new MetadataStoreException.NotFoundException(""));
            }
            CacheGetResult entry = (CacheGetResult)optEntry.get();
            Object currentValue = entry.getValue();
            long expectedVersion = entry.getStat().getVersion();
            try {
                currentValue = this.serde.deserialize(path, this.serde.serialize(path, currentValue), entry.getStat());
                newValueObj = modifyFunction.apply(currentValue);
                newValue = this.serde.serialize(path, newValueObj);
            }
            catch (Throwable t) {
                return FutureUtils.exception(t);
            }
            return ((CompletableFuture)this.store.put(path, newValue, Optional.of(expectedVersion)).thenAccept(__ -> this.refresh(path))).thenApply(__ -> newValueObj);
        }), path);
    }

    @Override
    public CompletableFuture<Void> create(String path, T value) {
        byte[] content;
        try {
            content = this.serde.serialize(path, value);
        }
        catch (Throwable t) {
            return FutureUtils.exception(t);
        }
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        ((CompletableFuture)this.store.put(path, content, Optional.of(-1L)).thenAccept(stat -> this.objCache.get(path).whenComplete((stat2, ex) -> {
            if (ex == null) {
                future.complete(null);
            } else {
                log.error("Exception while getting path {}", (Object)path, ex);
                future.completeExceptionally(ex.getCause());
            }
        }))).exceptionally(ex -> {
            if (ex.getCause() instanceof MetadataStoreException.BadVersionException) {
                future.completeExceptionally(new MetadataStoreException.AlreadyExistsException(ex.getCause()));
            } else {
                future.completeExceptionally(ex.getCause());
            }
            return null;
        });
        return future;
    }

    @Override
    public CompletableFuture<Void> delete(String path) {
        return this.store.delete(path, Optional.empty());
    }

    @Override
    public CompletableFuture<Boolean> exists(String path) {
        return this.store.exists(path);
    }

    @Override
    public CompletableFuture<List<String>> getChildren(String path) {
        return this.store.getChildren(path);
    }

    @Override
    public void invalidate(String path) {
        this.objCache.synchronous().invalidate(path);
    }

    @Override
    public void refresh(String path) {
        this.objCache.asMap().computeIfPresent(path, (oldKey, oldValue) -> this.readValueFromStore(path));
    }

    @Override
    @VisibleForTesting
    public void invalidateAll() {
        this.objCache.synchronous().invalidateAll();
    }

    @Override
    public void accept(Notification t) {
        String path = t.getPath();
        switch (t.getType()) {
            case Created: 
            case Modified: {
                this.refresh(path);
                break;
            }
            case Deleted: {
                this.objCache.synchronous().invalidate(path);
                break;
            }
        }
    }

    private CompletableFuture<T> executeWithRetry(Supplier<CompletableFuture<T>> op, String key) {
        CompletableFuture result = new CompletableFuture();
        ((CompletableFuture)op.get().thenAccept(result::complete)).exceptionally(ex -> {
            if (ex.getCause() instanceof MetadataStoreException.BadVersionException) {
                this.objCache.synchronous().invalidate(key);
                ((CompletableFuture)((CompletableFuture)op.get()).thenAccept(result::complete)).exceptionally(ex1 -> {
                    result.completeExceptionally(ex1.getCause());
                    return null;
                });
                return null;
            }
            result.completeExceptionally(ex.getCause());
            return null;
        });
        return result;
    }

    public MetadataStore getStore() {
        return this.store;
    }
}

