package org.infinispan.persistence.async;

import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.functions.Function;
import java.lang.invoke.MethodHandles;
import java.util.AbstractMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.function.Supplier;
import net.jcip.annotations.GuardedBy;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.IllegalLifecycleStateException;
import org.infinispan.commons.marshall.WrappedByteArray;
import org.infinispan.commons.reactive.RxJavaInterop;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.commons.util.concurrent.CompletionStages;
import org.infinispan.configuration.cache.AsyncStoreConfiguration;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.PersistenceConfiguration;
import org.infinispan.configuration.cache.StoreConfiguration;
import org.infinispan.factories.KnownComponentNames;
import org.infinispan.persistence.spi.InitializationContext;
import org.infinispan.persistence.spi.MarshallableEntry;
import org.infinispan.persistence.spi.NonBlockingStore;
import org.infinispan.persistence.support.DelegatingNonBlockingStore;
import org.infinispan.persistence.support.SegmentPublisherWrapper;
import org.infinispan.security.actions.SecurityActions;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.reactivestreams.Publisher;

/* loaded from: input_file:org/infinispan/persistence/async/AsyncNonBlockingStore.class */
public class AsyncNonBlockingStore<K, V> extends DelegatingNonBlockingStore<K, V> {
    private static final Log log;
    private final NonBlockingStore<K, V> actual;
    private Executor nonBlockingExecutor;
    private int segmentCount;
    private int modificationQueueSize;
    private PersistenceConfiguration persistenceConfiguration;
    private AsyncStoreConfiguration asyncConfiguration;
    private ScheduledExecutorService scheduler;

    @GuardedBy("this")
    private CompletableFuture<Void> batchFuture;

    @GuardedBy("this")
    private CompletableFuture<Void> delegateAvailableFuture;

    @GuardedBy("this")
    private boolean hasPendingClear;

    @GuardedBy("this")
    private boolean isReplicatingClear;
    static final /* synthetic */ boolean $assertionsDisabled;

    @GuardedBy("this")
    private Map<Object, Modification> pendingModifications = new HashMap();

    @GuardedBy("this")
    private Map<Object, Modification> replicatingModifications = Collections.emptyMap();
    private volatile boolean stopped = true;

    /* loaded from: input_file:org/infinispan/persistence/async/AsyncNonBlockingStore$RunnableCompletionStage.class */
    private static class RunnableCompletionStage extends CompletableFuture<Void> implements Runnable {
        private final Supplier<CompletionStage<Void>> supplier;

        private RunnableCompletionStage(Supplier<CompletionStage<Void>> supplier) {
            this.supplier = supplier;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.supplier.get().whenComplete((r4, th) -> {
                if (th != null) {
                    completeExceptionally(th);
                } else {
                    complete(null);
                }
            });
        }
    }

    public AsyncNonBlockingStore(NonBlockingStore<K, V> nonBlockingStore) {
        this.actual = nonBlockingStore;
    }

    @Override // org.infinispan.persistence.support.DelegatingNonBlockingStore, org.infinispan.persistence.spi.NonBlockingStore
    public Set<NonBlockingStore.Characteristic> characteristics() {
        return super.characteristics();
    }

    @Override // org.infinispan.persistence.support.DelegatingNonBlockingStore, org.infinispan.persistence.spi.NonBlockingStore
    public CompletionStage<Void> start(InitializationContext initializationContext) {
        Configuration cacheConfiguration = initializationContext.getCache().getCacheConfiguration();
        this.persistenceConfiguration = cacheConfiguration.persistence();
        this.scheduler = (ScheduledExecutorService) SecurityActions.getGlobalComponentRegistry(initializationContext.getCache().getCacheManager()).getComponent(ScheduledExecutorService.class, KnownComponentNames.TIMEOUT_SCHEDULE_EXECUTOR);
        if (!$assertionsDisabled && this.scheduler == null) {
            throw new AssertionError();
        }
        StoreConfiguration configuration = initializationContext.getConfiguration();
        this.segmentCount = configuration.segmented() ? cacheConfiguration.clustering().hash().numSegments() : 1;
        this.asyncConfiguration = configuration.async();
        this.modificationQueueSize = this.asyncConfiguration.modificationQueueSize();
        this.nonBlockingExecutor = initializationContext.getNonBlockingExecutor();
        this.stopped = false;
        return this.actual.start(initializationContext);
    }

    @Override // org.infinispan.persistence.support.DelegatingNonBlockingStore, org.infinispan.persistence.spi.NonBlockingStore
    public CompletionStage<Void> stop() {
        if (log.isTraceEnabled()) {
            log.tracef("Stopping async store containing store %s", this.actual);
        }
        return awaitQuiescence().thenCompose(r5 -> {
            if (log.isTraceEnabled()) {
                log.tracef("Stopping store %s from async store", this.actual);
            }
            this.stopped = true;
            return this.actual.stop();
        });
    }

    private CompletionStage<Void> awaitQuiescence() {
        CompletableFuture<Void> completableFuture;
        synchronized (this) {
            completableFuture = this.batchFuture;
        }
        if (completableFuture == null) {
            return CompletableFutures.completedNull();
        }
        if (log.isTraceEnabled()) {
            log.tracef("Must wait until prior batch completes for %s", this.actual);
        }
        return completableFuture.thenCompose(r3 -> {
            return awaitQuiescence();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void putModification(Object obj, Modification modification) {
        this.pendingModifications.put(obj, modification);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void putClearModification() {
        this.pendingModifications.clear();
        this.hasPendingClear = true;
    }

    private void submitTask() {
        Map<Object, Modification> map;
        boolean z;
        CompletionStage<Void> completedNull;
        HashMap hashMap = new HashMap();
        if (log.isTraceEnabled()) {
            log.tracef("Starting new batch with id %s", System.identityHashCode(hashMap));
        }
        synchronized (this) {
            if (!$assertionsDisabled && (!this.replicatingModifications.isEmpty() || this.isReplicatingClear)) {
                throw new AssertionError();
            }
            this.replicatingModifications = this.pendingModifications;
            map = this.pendingModifications;
            this.pendingModifications = hashMap;
            this.isReplicatingClear = this.hasPendingClear;
            z = this.hasPendingClear;
            this.hasPendingClear = false;
        }
        if (z) {
            if (log.isTraceEnabled()) {
                log.tracef("Sending clear to underlying store for id %s", System.identityHashCode(map));
            }
            NonBlockingStore<K, V> nonBlockingStore = this.actual;
            Objects.requireNonNull(nonBlockingStore);
            completedNull = retry(nonBlockingStore::clear, this.persistenceConfiguration.connectionAttempts()).whenComplete((r4, th) -> {
                synchronized (this) {
                    this.isReplicatingClear = false;
                }
            });
        } else {
            completedNull = CompletableFutures.completedNull();
        }
        if (!map.isEmpty()) {
            completedNull = completedNull.thenCompose(r7 -> {
                if (log.isTraceEnabled()) {
                    log.tracef("Sending batch of %d write/remove operations to underlying store with id %s", map.size(), System.identityHashCode(map));
                }
                return retry(() -> {
                    return replicateModifications(map);
                }, this.persistenceConfiguration.connectionAttempts()).whenComplete((r42, th2) -> {
                    synchronized (this) {
                        this.replicatingModifications = Collections.emptyMap();
                    }
                });
            });
        }
        completedNull.whenComplete((r6, th2) -> {
            boolean z2;
            CompletableFuture<Void> completableFuture;
            if (log.isTraceEnabled()) {
                log.tracef("Async operations completed for id %s", System.identityHashCode(map));
            }
            synchronized (this) {
                z2 = !this.pendingModifications.isEmpty() || this.hasPendingClear;
                completableFuture = this.batchFuture;
                this.batchFuture = z2 ? new CompletableFuture<>() : null;
            }
            if (th2 != null) {
                completableFuture.completeExceptionally(th2);
            } else {
                completableFuture.complete(null);
            }
            if (z2) {
                if (log.isTraceEnabled()) {
                    log.trace("Submitting new batch after completion of prior");
                }
                submitTask();
            }
        });
    }

    private CompletionStage<Void> retry(Supplier<CompletionStage<Void>> supplier, int i) {
        return CompletionStages.handleAndCompose(getAvailabilityDelayStage().thenCompose(r3 -> {
            return (CompletionStage) supplier.get();
        }), (r9, th) -> {
            if (th == null) {
                return CompletableFutures.completedNull();
            }
            if (i <= 0) {
                log.debug("Failed to process async operation - no more retries", th);
                return CompletableFuture.failedFuture(th);
            }
            int availabilityInterval = this.persistenceConfiguration.availabilityInterval();
            log.debugf(th, "Failed to process async operation - retrying with delay of %d ms", availabilityInterval);
            if (availabilityInterval <= 0) {
                return retry(supplier, i - 1);
            }
            RunnableCompletionStage runnableCompletionStage = new RunnableCompletionStage(() -> {
                return retry(supplier, i - 1);
            });
            this.scheduler.schedule(runnableCompletionStage, availabilityInterval, TimeUnit.MILLISECONDS);
            return runnableCompletionStage;
        });
    }

    private CompletionStage<Void> replicateModifications(Map<Object, Modification> map) {
        Flowable autoConnect = Flowable.fromIterable(map.values()).publish().autoConnect(2);
        return this.actual.batch(this.segmentCount, autoConnect.ofType(RemoveModification.class).groupBy((v0) -> {
            return v0.getSegment();
        }, (v0) -> {
            return v0.getKey();
        }).map(SegmentPublisherWrapper::wrap), autoConnect.ofType(PutModification.class).groupBy((v0) -> {
            return v0.getSegment();
        }, (v0) -> {
            return v0.getEntry();
        }).map(SegmentPublisherWrapper::wrap));
    }

    private CompletionStage<Void> getAvailabilityDelayStage() {
        CompletableFuture<Void> completableFuture;
        if (this.asyncConfiguration.failSilently()) {
            return CompletableFutures.completedNull();
        }
        synchronized (this) {
            completableFuture = this.delegateAvailableFuture;
        }
        return completableFuture == null ? CompletableFutures.completedNull() : completableFuture;
    }

    @Override // org.infinispan.persistence.support.DelegatingNonBlockingStore, org.infinispan.persistence.spi.NonBlockingStore
    public Publisher<MarshallableEntry<K, V>> publishEntries(IntSet intSet, Predicate<? super K> predicate, boolean z) {
        return Flowable.defer(() -> {
            assertNotStopped();
            if (log.isTraceEnabled()) {
                log.tracef("Publisher subscribed to retrieve entries for segments %s", intSet);
            }
            return abstractPublish(intSet, predicate, (v0) -> {
                return v0.getEntry();
            }, (v0) -> {
                return v0.getKey();
            }, (intSet2, predicate2) -> {
                return this.actual.publishEntries(intSet2, predicate2, z);
            });
        });
    }

    @Override // org.infinispan.persistence.support.DelegatingNonBlockingStore, org.infinispan.persistence.spi.NonBlockingStore
    public Publisher<K> publishKeys(IntSet intSet, Predicate<? super K> predicate) {
        return Flowable.defer(() -> {
            assertNotStopped();
            if (log.isTraceEnabled()) {
                log.tracef("Publisher subscribed to retrieve keys for segments %s", intSet);
            }
            Function<PutModification, E> function = putModification -> {
                return putModification.getEntry().getKey();
            };
            Function<E, K> identityFunction = RxJavaInterop.identityFunction();
            NonBlockingStore<K, V> nonBlockingStore = this.actual;
            Objects.requireNonNull(nonBlockingStore);
            return abstractPublish(intSet, predicate, function, identityFunction, nonBlockingStore::publishKeys);
        });
    }

    private <E> Publisher<E> abstractPublish(IntSet intSet, Predicate<? super K> predicate, Function<PutModification, E> function, Function<E, K> function2, BiFunction<IntSet, Predicate<K>, Publisher<E>> biFunction) {
        Map.Entry<Boolean, Map<Object, Modification>> flattenModificationMaps = flattenModificationMaps();
        Map<Object, Modification> value = flattenModificationMaps.getValue();
        Flowable map = Flowable.fromIterable(value.values()).ofType(PutModification.class).filter(putModification -> {
            return intSet.contains(putModification.getSegment());
        }).map(function);
        if (predicate != null) {
            map = map.filter(obj -> {
                return predicate.test(function2.apply(obj));
            });
        }
        if (flattenModificationMaps.getKey().booleanValue()) {
            if (log.isTraceEnabled()) {
                log.trace("Only utilizing pending modifications as clear a was found");
            }
            return map;
        }
        Predicate<K> predicate2 = obj2 -> {
            return !value.containsKey(obj2);
        };
        if (predicate != null) {
            predicate2 = predicate2.and(predicate);
        }
        return map.concatWith(biFunction.apply(intSet, predicate2));
    }

    private Map.Entry<Boolean, Map<Object, Modification>> flattenModificationMaps() {
        synchronized (this) {
            HashMap hashMap = new HashMap(this.pendingModifications);
            if (this.hasPendingClear) {
                return new AbstractMap.SimpleImmutableEntry(Boolean.TRUE, hashMap);
            }
            Map<? extends K, ? extends V> map = this.replicatingModifications;
            boolean z = this.isReplicatingClear;
            hashMap.putAll(map);
            return new AbstractMap.SimpleImmutableEntry(Boolean.valueOf(z), hashMap);
        }
    }

    @Override // org.infinispan.persistence.support.DelegatingNonBlockingStore, org.infinispan.persistence.spi.NonBlockingStore
    public CompletionStage<MarshallableEntry<K, V>> load(int i, Object obj) {
        assertNotStopped();
        CompletionStage<MarshallableEntry<K, V>> stageFromPending = getStageFromPending(obj);
        return stageFromPending != null ? stageFromPending : this.actual.load(i, obj);
    }

    private CompletionStage<MarshallableEntry<K, V>> getStageFromPending(Object obj) {
        Object wrapKeyIfNeeded = wrapKeyIfNeeded(obj);
        synchronized (this) {
            Modification modification = this.pendingModifications.get(wrapKeyIfNeeded);
            if (modification != null) {
                if (log.isTraceEnabled()) {
                    log.tracef("Found entry was pending write in async store: %s", modification);
                }
                return modification.asStage();
            }
            if (this.hasPendingClear) {
                if (log.isTraceEnabled()) {
                    log.trace("There is a pending clear from async store, returning null");
                }
                return CompletableFutures.completedNull();
            }
            Map<Object, Modification> map = this.replicatingModifications;
            boolean z = this.isReplicatingClear;
            Modification modification2 = map.get(wrapKeyIfNeeded);
            if (modification2 != null) {
                if (log.isTraceEnabled()) {
                    log.tracef("Found entry was replicating write in async store: %s", modification2);
                }
                return modification2.asStage();
            }
            if (!z) {
                return null;
            }
            if (log.isTraceEnabled()) {
                log.trace("There is a clear being replicated from async store, returning null");
            }
            return CompletableFutures.completedNull();
        }
    }

    public int segmentToUse(int i) {
        if (this.segmentCount == 1) {
            return 0;
        }
        return i;
    }

    @Override // org.infinispan.persistence.support.DelegatingNonBlockingStore, org.infinispan.persistence.spi.NonBlockingStore
    public CompletionStage<Void> batch(int i, Publisher<NonBlockingStore.SegmentedPublisher<Object>> publisher, Publisher<NonBlockingStore.SegmentedPublisher<MarshallableEntry<K, V>>> publisher2) {
        assertNotStopped();
        return Flowable.fromPublisher(publisher).flatMapCompletable(segmentedPublisher -> {
            return Flowable.fromPublisher(segmentedPublisher).concatMapCompletable(obj -> {
                return Completable.fromCompletionStage(submitModification(new RemoveModification(segmentToUse(segmentedPublisher.getSegment()), obj)));
            }, i);
        }).mergeWith(Flowable.fromPublisher(publisher2).flatMapCompletable(segmentedPublisher2 -> {
            return Flowable.fromPublisher(segmentedPublisher2).concatMapCompletable(marshallableEntry -> {
                return Completable.fromCompletionStage(submitModification(new PutModification(segmentToUse(segmentedPublisher2.getSegment()), marshallableEntry)));
            }, i);
        })).toCompletionStage(null);
    }

    CompletionStage<Void> submitModification(Modification modification) {
        int identityHashCode;
        boolean z;
        CompletableFuture<Void> completableFuture;
        boolean isTraceEnabled = log.isTraceEnabled();
        synchronized (this) {
            if (isTraceEnabled) {
                identityHashCode = System.identityHashCode(this.replicatingModifications);
                log.tracef("Adding modification %s to batch %s", modification, Integer.valueOf(System.identityHashCode(this.pendingModifications)));
            } else {
                identityHashCode = 0;
            }
            modification.apply(this);
            z = this.batchFuture == null;
            if (z) {
                this.batchFuture = new CompletableFuture<>();
            }
            int size = this.pendingModifications.size() + this.replicatingModifications.size();
            completableFuture = size > this.modificationQueueSize ? this.batchFuture : null;
            if (completableFuture != null && isTraceEnabled) {
                log.tracef("Too many modifications queued (%d), operation must wait until previous batch %d completes", size, identityHashCode);
            }
        }
        if (z) {
            submitTask();
        }
        return completableFuture == null ? CompletableFutures.completedNull() : completableFuture.thenApplyAsync(CompletableFutures.toNullFunction(), this.nonBlockingExecutor);
    }

    @Override // org.infinispan.persistence.support.DelegatingNonBlockingStore, org.infinispan.persistence.spi.NonBlockingStore
    public CompletionStage<Void> write(int i, MarshallableEntry<? extends K, ? extends V> marshallableEntry) {
        assertNotStopped();
        return submitModification(new PutModification(segmentToUse(i), marshallableEntry));
    }

    @Override // org.infinispan.persistence.support.DelegatingNonBlockingStore, org.infinispan.persistence.spi.NonBlockingStore
    public CompletionStage<Boolean> delete(int i, Object obj) {
        assertNotStopped();
        return submitModification(new RemoveModification(segmentToUse(i), obj));
    }

    @Override // org.infinispan.persistence.support.DelegatingNonBlockingStore, org.infinispan.persistence.spi.NonBlockingStore
    public CompletionStage<Void> clear() {
        assertNotStopped();
        submitModification(ClearModification.INSTANCE);
        return CompletableFutures.completedNull();
    }

    @Override // org.infinispan.persistence.support.DelegatingNonBlockingStore, org.infinispan.persistence.spi.NonBlockingStore
    public Publisher<MarshallableEntry<K, V>> purgeExpired() {
        return Flowable.defer(() -> {
            assertNotStopped();
            return this.actual.purgeExpired();
        });
    }

    @Override // org.infinispan.persistence.support.DelegatingNonBlockingStore, org.infinispan.persistence.spi.NonBlockingStore
    public CompletionStage<Void> addSegments(IntSet intSet) {
        assertNotStopped();
        return this.actual.addSegments(intSet);
    }

    @Override // org.infinispan.persistence.support.DelegatingNonBlockingStore, org.infinispan.persistence.spi.NonBlockingStore
    public CompletionStage<Void> removeSegments(IntSet intSet) {
        assertNotStopped();
        synchronized (this) {
            this.pendingModifications.values().removeIf(modification -> {
                return intSet.contains(modification.getSegment());
            });
        }
        return this.actual.removeSegments(intSet);
    }

    @Override // org.infinispan.persistence.support.DelegatingNonBlockingStore, org.infinispan.persistence.spi.NonBlockingStore
    public CompletionStage<Long> size(IntSet intSet) {
        assertNotStopped();
        return this.actual.size(intSet);
    }

    @Override // org.infinispan.persistence.support.DelegatingNonBlockingStore, org.infinispan.persistence.spi.NonBlockingStore
    public CompletionStage<Long> approximateSize(IntSet intSet) {
        assertNotStopped();
        return this.actual.approximateSize(intSet);
    }

    @Override // org.infinispan.persistence.support.DelegatingNonBlockingStore, org.infinispan.persistence.spi.NonBlockingStore
    public CompletionStage<Boolean> isAvailable() {
        return this.stopped ? CompletableFutures.completedFalse() : this.asyncConfiguration.failSilently() ? CompletableFutures.completedTrue() : super.isAvailable().thenApply(bool -> {
            boolean z;
            int size;
            boolean z2;
            CompletableFuture<Void> completableFuture;
            if (bool.booleanValue()) {
                synchronized (this) {
                    completableFuture = this.delegateAvailableFuture;
                    this.delegateAvailableFuture = null;
                }
                if (completableFuture != null) {
                    log.debugf("Underlying delegate %s is now available", this.actual);
                    completableFuture.complete(null);
                }
                return true;
            }
            synchronized (this) {
                z = !this.replicatingModifications.isEmpty() || this.isReplicatingClear;
                size = this.pendingModifications.size();
                boolean z3 = this.delegateAvailableFuture == null;
                z2 = z3;
                if (z3) {
                    this.delegateAvailableFuture = new CompletableFuture<>();
                }
            }
            if (z2) {
                log.debugf("Underlying delegate %s is now unavailable!", this.actual);
            }
            return Boolean.valueOf(size < this.modificationQueueSize || !z);
        });
    }

    @Override // org.infinispan.persistence.support.DelegatingNonBlockingStore
    public NonBlockingStore<K, V> delegate() {
        return this.actual;
    }

    private void assertNotStopped() throws CacheException {
        if (this.stopped) {
            throw new IllegalLifecycleStateException("AsyncCacheWriter stopped; no longer accepting more entries.");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Object wrapKeyIfNeeded(Object obj) {
        return obj instanceof byte[] ? new WrappedByteArray((byte[]) obj) : obj;
    }

    static {
        $assertionsDisabled = !AsyncNonBlockingStore.class.desiredAssertionStatus();
        log = LogFactory.getLog(MethodHandles.lookup().lookupClass());
    }
}
