package com.oracle.coherence.grpc.proxy.common.v0;

import com.google.protobuf.BoolValue;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
import com.google.protobuf.Empty;
import com.oracle.coherence.common.base.Logger;
import com.oracle.coherence.grpc.BinaryHelper;
import com.oracle.coherence.grpc.ErrorsHelper;
import com.oracle.coherence.grpc.SafeStreamObserver;
import com.oracle.coherence.grpc.messages.cache.v0.AddIndexRequest;
import com.oracle.coherence.grpc.messages.cache.v0.AggregateRequest;
import com.oracle.coherence.grpc.messages.cache.v0.ClearRequest;
import com.oracle.coherence.grpc.messages.cache.v0.DestroyRequest;
import com.oracle.coherence.grpc.messages.cache.v0.GetAllRequest;
import com.oracle.coherence.grpc.messages.cache.v0.MapListenerRequest;
import com.oracle.coherence.grpc.messages.cache.v0.MapListenerResponse;
import com.oracle.coherence.grpc.messages.cache.v0.PutAllRequest;
import com.oracle.coherence.grpc.messages.cache.v0.RemoveIndexRequest;
import com.oracle.coherence.grpc.proxy.common.BaseGrpcServiceImpl;
import com.oracle.coherence.grpc.proxy.common.v0.NamedCacheService;
import com.oracle.coherence.grpc.v0.CacheRequestHolder;
import com.tangosol.internal.util.processor.BinaryProcessors;
import com.tangosol.io.Serializer;
import com.tangosol.net.AsyncNamedCache;
import com.tangosol.net.CacheService;
import com.tangosol.net.ConfigurableCacheFactory;
import com.tangosol.net.NamedCache;
import com.tangosol.net.PartitionedService;
import com.tangosol.util.Binary;
import com.tangosol.util.Filter;
import com.tangosol.util.Filters;
import com.tangosol.util.InvocableMap;
import com.tangosol.util.ValueExtractor;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/* loaded from: input_file:com/oracle/coherence/grpc/proxy/common/v0/BaseNamedCacheServiceImpl.class */
public abstract class BaseNamedCacheServiceImpl extends BaseGrpcServiceImpl implements NamedCacheService {
    public static final String MBEAN_NAME = "type=GrpcNamedCacheProxy";
    public static final String INVALID_REQUEST_MESSAGE = "invalid request, the request cannot be null";
    public static final String MISSING_PROCESSOR_MESSAGE = "the request does not contain a serialized entry processor";
    public static final String MISSING_EXTRACTOR_MESSAGE = "the request does not contain a serialized ValueExtractor";
    public static final String MISSING_AGGREGATOR_MESSAGE = "the request does not contain a serialized ValueExtractor";
    private final long m_nEventsHeartbeat;
    protected volatile boolean m_fClosed;
    protected final Lock f_lock;

    public BaseNamedCacheServiceImpl(NamedCacheService.Dependencies dependencies) {
        super(dependencies, MBEAN_NAME, "GrpcNamedCacheProxy");
        this.m_fClosed = false;
        this.f_lock = new ReentrantLock();
        this.m_nEventsHeartbeat = dependencies.getEventsHeartbeat();
    }

    @Override // com.oracle.coherence.grpc.proxy.common.v0.NamedCacheService
    public void close() {
        this.f_lock.lock();
        try {
            this.m_fClosed = true;
            this.f_listCloseable.forEach(closeable -> {
                try {
                    closeable.close();
                } catch (Exception e) {
                    Logger.err(e);
                }
            });
            this.f_listCloseable.clear();
        } finally {
            this.f_lock.unlock();
        }
    }

    protected Empty addIndex(CacheRequestHolder<AddIndexRequest, Void> cacheRequestHolder) {
        AddIndexRequest addIndexRequest = (AddIndexRequest) cacheRequestHolder.getRequest();
        NamedCache cache = cacheRequestHolder.getCache();
        Serializer serializer = cacheRequestHolder.getSerializer();
        cache.addIndex(ensureValueExtractor(addIndexRequest.getExtractor(), serializer), addIndexRequest.getSorted(), (Comparator) BinaryHelper.fromByteString(addIndexRequest.getComparator(), serializer));
        return BinaryHelper.EMPTY;
    }

    protected CompletionStage<BytesValue> aggregateWithFilter(AggregateRequest aggregateRequest, Executor executor) {
        return createHolderAsync(aggregateRequest, aggregateRequest.getScope(), aggregateRequest.getCache(), aggregateRequest.getFormat()).thenComposeAsync(cacheRequestHolder -> {
            return aggregateWithFilter((CacheRequestHolder<AggregateRequest, Void>) cacheRequestHolder, executor);
        }, executor).handleAsync((v0, v1) -> {
            return ResponseHandlers.handleError(v0, v1);
        }, executor);
    }

    protected CompletionStage<BytesValue> aggregateWithFilter(CacheRequestHolder<AggregateRequest, Void> cacheRequestHolder, Executor executor) {
        AggregateRequest aggregateRequest = (AggregateRequest) cacheRequestHolder.getRequest();
        ByteString filter = aggregateRequest.getFilter();
        return cacheRequestHolder.runAsync(cacheRequestHolder.getAsyncCache().aggregate(filter.isEmpty() ? Filters.always() : (Filter) BinaryHelper.fromByteString(filter, cacheRequestHolder.getSerializer()), (InvocableMap.EntryAggregator) BinaryHelper.fromByteString(aggregateRequest.getAggregator(), cacheRequestHolder.getSerializer()))).thenApplyAsync(cacheRequestHolder2 -> {
            return BinaryHelper.toBytesValue(cacheRequestHolder2.getResult(), cacheRequestHolder2.getSerializer());
        }, executor);
    }

    protected CompletionStage<BytesValue> aggregateWithKeys(AggregateRequest aggregateRequest, Executor executor) {
        return createHolderAsync(aggregateRequest, aggregateRequest.getScope(), aggregateRequest.getCache(), aggregateRequest.getFormat()).thenComposeAsync(cacheRequestHolder -> {
            return aggregateWithKeys((CacheRequestHolder<AggregateRequest, Void>) cacheRequestHolder, executor);
        }, executor).handleAsync((v0, v1) -> {
            return ResponseHandlers.handleError(v0, v1);
        }, executor);
    }

    protected CompletionStage<BytesValue> aggregateWithKeys(CacheRequestHolder<AggregateRequest, Void> cacheRequestHolder, Executor executor) {
        AggregateRequest aggregateRequest = (AggregateRequest) cacheRequestHolder.getRequest();
        Stream stream = aggregateRequest.getKeysList().stream();
        Objects.requireNonNull(cacheRequestHolder);
        return cacheRequestHolder.runAsync(cacheRequestHolder.getAsyncCache().aggregate((List) stream.map(cacheRequestHolder::convertKeyDown).collect(Collectors.toList()), (InvocableMap.EntryAggregator) BinaryHelper.fromByteString(aggregateRequest.getAggregator(), cacheRequestHolder.getSerializer()))).thenApplyAsync(cacheRequestHolder2 -> {
            return BinaryHelper.toBytesValue(cacheRequestHolder2.getResult(), cacheRequestHolder2.getSerializer());
        }, executor);
    }

    @Override // com.oracle.coherence.grpc.proxy.common.v0.NamedCacheService
    public void clear(ClearRequest clearRequest, StreamObserver<Empty> streamObserver) {
        StreamObserver ensureSafeObserver = SafeStreamObserver.ensureSafeObserver(streamObserver);
        try {
            getPassThroughCache(clearRequest.getScope(), clearRequest.getCache()).clear();
            ResponseHandlers.handleUnary(Empty.getDefaultInstance(), null, ensureSafeObserver);
        } catch (Throwable th) {
            ResponseHandlers.handleUnary(Empty.getDefaultInstance(), th, ensureSafeObserver);
        }
    }

    @Override // com.oracle.coherence.grpc.proxy.common.v0.NamedCacheService
    public void destroy(DestroyRequest destroyRequest, StreamObserver<Empty> streamObserver) {
        StreamObserver ensureSafeObserver = SafeStreamObserver.ensureSafeObserver(streamObserver);
        String cache = destroyRequest.getCache();
        if (cache != null) {
            try {
                if (!cache.trim().isEmpty()) {
                    Logger.finer("Destroying cache " + cache);
                    ConfigurableCacheFactory ccf = getCCF(destroyRequest.getScope());
                    ccf.destroyCache(ccf.ensureCache(cache, (ClassLoader) null));
                    ResponseHandlers.handleUnary(Empty.getDefaultInstance(), null, ensureSafeObserver);
                    Logger.info("Destroyed cache " + cache);
                    return;
                }
            } catch (Throwable th) {
                Logger.err("Caught exception destroying cache \"" + cache + "\"", th);
                ResponseHandlers.handleUnary(Empty.getDefaultInstance(), th, ensureSafeObserver);
                return;
            }
        }
        throw Status.INVALID_ARGUMENT.withDescription(BaseGrpcServiceImpl.INVALID_CACHE_NAME_MESSAGE).asRuntimeException();
    }

    @Override // com.oracle.coherence.grpc.proxy.common.v0.NamedCacheService
    public StreamObserver<MapListenerRequest> events(StreamObserver<MapListenerResponse> streamObserver) {
        this.f_lock.lock();
        try {
            if (this.m_fClosed) {
                throw Status.UNAVAILABLE.asRuntimeException();
            }
            MapListenerProxy mapListenerProxy = new MapListenerProxy(this, SafeStreamObserver.ensureSafeObserver(streamObserver), this.m_nEventsHeartbeat);
            addCloseable(mapListenerProxy);
            this.f_lock.unlock();
            return mapListenerProxy;
        } catch (Throwable th) {
            this.f_lock.unlock();
            throw th;
        }
    }

    protected List<Binary> convertKeysToBinary(CacheRequestHolder<GetAllRequest, Void> cacheRequestHolder) {
        Stream stream = ((GetAllRequest) cacheRequestHolder.getRequest()).getKeyList().stream();
        Objects.requireNonNull(cacheRequestHolder);
        return (List) stream.map(cacheRequestHolder::convertKeyDown).collect(Collectors.toList());
    }

    protected CompletionStage<Empty> partitionedPutAll(CacheRequestHolder<PutAllRequest, Void> cacheRequestHolder, Map<Binary, Binary> map) {
        try {
            HashMap hashMap = new HashMap();
            PartitionedService cacheService = cacheRequestHolder.getCache().getCacheService();
            for (Map.Entry<Binary, Binary> entry : map.entrySet()) {
                Binary key = entry.getKey();
                ((Map) hashMap.computeIfAbsent(cacheService.getKeyOwner(key), member -> {
                    return new HashMap();
                })).put(key, entry.getValue());
            }
            AsyncNamedCache asyncCache = cacheRequestHolder.getAsyncCache();
            long ttl = ((PutAllRequest) cacheRequestHolder.getRequest()).getTtl();
            return CompletableFuture.allOf((CompletableFuture[]) hashMap.values().stream().map(map2 -> {
                return plainPutAll(asyncCache, map2, ttl);
            }).map((v0) -> {
                return v0.toCompletableFuture();
            }).toArray(i -> {
                return new CompletableFuture[i];
            })).thenApply(r2 -> {
                return BinaryHelper.EMPTY;
            });
        } catch (Throwable th) {
            CompletableFuture completableFuture = new CompletableFuture();
            completableFuture.completeExceptionally(th);
            return completableFuture;
        }
    }

    protected CompletionStage<Empty> plainPutAll(AsyncNamedCache<Binary, Binary> asyncNamedCache, Map<Binary, Binary> map, long j) {
        return asyncNamedCache.invokeAll(map.keySet(), BinaryProcessors.putAll(map, j)).thenApplyAsync(map2 -> {
            return BinaryHelper.EMPTY;
        }, this.f_executor);
    }

    protected Empty removeIndex(CacheRequestHolder<RemoveIndexRequest, Void> cacheRequestHolder) {
        cacheRequestHolder.getCache().removeIndex(ensureValueExtractor(((RemoveIndexRequest) cacheRequestHolder.getRequest()).getExtractor(), cacheRequestHolder.getSerializer()));
        return BinaryHelper.EMPTY;
    }

    protected <V> Empty empty(V v) {
        return BinaryHelper.EMPTY;
    }

    protected Empty execute(Runnable runnable) {
        runnable.run();
        return BinaryHelper.EMPTY;
    }

    protected <T> T execute(Callable<T> callable) {
        try {
            return callable.call();
        } catch (Throwable th) {
            throw ErrorsHelper.ensureStatusRuntimeException(th);
        }
    }

    protected BoolValue toBoolValue(Binary binary, Serializer serializer) {
        return BoolValue.of(((Boolean) BinaryHelper.fromBinary(binary, serializer)).booleanValue());
    }

    public ValueExtractor<?, ?> ensureValueExtractor(ByteString byteString, Serializer serializer) {
        if (byteString == null || byteString.isEmpty()) {
            throw Status.INVALID_ARGUMENT.withDescription("the request does not contain a serialized ValueExtractor").asRuntimeException();
        }
        return (ValueExtractor) BinaryHelper.fromByteString(byteString, serializer);
    }

    @Override // com.oracle.coherence.grpc.proxy.common.v0.NamedCacheService
    public <T> Filter<T> ensureFilter(ByteString byteString, Serializer serializer) {
        return (byteString == null || byteString.isEmpty()) ? Filters.always() : (Filter) BinaryHelper.fromByteString(byteString, serializer);
    }

    @Override // com.oracle.coherence.grpc.proxy.common.v0.NamedCacheService
    public <T> Filter<T> getFilter(ByteString byteString, Serializer serializer) {
        if (byteString == null || byteString.isEmpty()) {
            return null;
        }
        return (Filter) BinaryHelper.fromByteString(byteString, serializer);
    }

    public <T> Comparator<T> deserializeComparator(ByteString byteString, Serializer serializer) {
        if (byteString == null || byteString.isEmpty()) {
            return null;
        }
        return (Comparator) BinaryHelper.fromByteString(byteString, serializer);
    }

    protected CompletionStage<AsyncNamedCache<Binary, Binary>> getAsyncCache(String str, String str2) {
        return CompletableFuture.supplyAsync(() -> {
            return getPassThroughCache(str, str2).async();
        }, this.f_executor);
    }

    protected InvocableMap.EntryProcessor<Binary, Binary, Binary> castProcessor(InvocableMap.EntryProcessor<Binary, Binary, ?> entryProcessor) {
        return entryProcessor;
    }

    public <Req> CompletionStage<CacheRequestHolder<Req, Void>> createHolderAsync(Req req, String str, String str2, String str3) {
        return CompletableFuture.supplyAsync(() -> {
            return createRequestHolder(req, str, str2, str3);
        }, this.f_executor);
    }

    @Override // com.oracle.coherence.grpc.proxy.common.v0.NamedCacheService
    public <Req> CacheRequestHolder<Req, Void> createRequestHolder(Req req, String str, String str2, String str3) {
        if (req == null) {
            throw Status.INVALID_ARGUMENT.withDescription(INVALID_REQUEST_MESSAGE).asRuntimeException();
        }
        if (str2 == null || str2.isEmpty()) {
            throw Status.INVALID_ARGUMENT.withDescription(BaseGrpcServiceImpl.INVALID_CACHE_NAME_MESSAGE).asRuntimeException();
        }
        NamedCache<Binary, Binary> cache = getCache(str, str2, true);
        NamedCache<Binary, Binary> cache2 = getCache(str, str2, false);
        AsyncNamedCache async = cache.async();
        CacheService cacheService = async.getNamedCache().getCacheService();
        String cacheFormat = CacheRequestHolder.getCacheFormat(cacheService);
        Objects.requireNonNull(cacheService);
        Supplier<Serializer> supplier = cacheService::getSerializer;
        Objects.requireNonNull(cacheService);
        return new CacheRequestHolder<>(req, async, () -> {
            return cache2;
        }, str3, getSerializer(str3, cacheFormat, supplier, cacheService::getContextClassLoader), this.f_executor);
    }
}
