package org.infinispan.reactive.publisher.impl;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.FlowableSubscriber;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.functions.Function;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import net.jcip.annotations.GuardedBy;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.commons.util.concurrent.CompletionStages;
import org.infinispan.factories.KnownComponentNames;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachemanagerlistener.CacheManagerNotifier;
import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
import org.infinispan.reactive.RxJavaInterop;
import org.infinispan.reactive.publisher.impl.Notifications;
import org.infinispan.reactive.publisher.impl.SegmentAwarePublisherSupplier;
import org.infinispan.reactive.publisher.impl.commands.batch.InitialPublisherCommand;
import org.infinispan.reactive.publisher.impl.commands.batch.KeyPublisherResponse;
import org.infinispan.reactive.publisher.impl.commands.batch.PublisherResponse;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;

@Scope(Scopes.NAMED_CACHE)
@Listener(observation = Listener.Observation.POST)
/* loaded from: input_file:org/infinispan/reactive/publisher/impl/PublisherHandler.class */
public class PublisherHandler {
    private static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass());
    private final ConcurrentMap<Object, PublisherState> currentRequests = new ConcurrentHashMap();

    @Inject
    CacheManagerNotifier managerNotifier;

    @ComponentName(KnownComponentNames.NON_BLOCKING_EXECUTOR)
    @Inject
    ExecutorService nonBlockingExecutor;

    @Inject
    LocalPublisherManager localPublisherManager;

    /* loaded from: input_file:org/infinispan/reactive/publisher/impl/PublisherHandler$KeyPublisherState.class */
    class KeyPublisherState extends PublisherState {
        Object[] extraValues;
        int extraPos;
        Object[] keys;
        int keyPos;
        int keyStartPosition;
        static final /* synthetic */ boolean $assertionsDisabled;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/infinispan/reactive/publisher/impl/PublisherHandler$KeyPublisherState$KeyCompleted.class */
        public class KeyCompleted<E> extends Notifications.ReuseNotificationBuilder<E> {
            KeyCompleted() {
            }

            @Override // org.infinispan.reactive.publisher.impl.Notifications.ReuseNotificationBuilder
            public String toString() {
                return "KeyCompleted{key=" + String.valueOf(this.value) + ", segment=" + this.segment + "}";
            }
        }

        private KeyPublisherState(String str, Address address, int i) {
            super(str, address, i);
        }

        @Override // org.infinispan.reactive.publisher.impl.PublisherHandler.PublisherState
        void startProcessing(InitialPublisherCommand initialPublisherCommand) {
            SegmentAwarePublisherSupplier keyPublisher;
            Function identityFunction;
            if (initialPublisherCommand.isEntryStream()) {
                keyPublisher = PublisherHandler.this.localPublisherManager.entryPublisher(initialPublisherCommand.getSegments(), initialPublisherCommand.getKeys(), initialPublisherCommand.getExcludedKeys(), initialPublisherCommand.getExplicitFlags(), DeliveryGuarantee.EXACTLY_ONCE, java.util.function.Function.identity());
                identityFunction = RxJavaInterop.entryToKeyFunction();
            } else {
                keyPublisher = PublisherHandler.this.localPublisherManager.keyPublisher(initialPublisherCommand.getSegments(), initialPublisherCommand.getKeys(), initialPublisherCommand.getExcludedKeys(), initialPublisherCommand.getExplicitFlags(), DeliveryGuarantee.EXACTLY_ONCE, java.util.function.Function.identity());
                identityFunction = RxJavaInterop.identityFunction();
            }
            java.util.function.Function transformer = initialPublisherCommand.getTransformer();
            Notifications.NotificationBuilder reuseBuilder = Notifications.reuseBuilder();
            KeyCompleted keyCompleted = new KeyCompleted();
            Function function = identityFunction;
            Flowable.fromPublisher(keyPublisher.publisherWithLostSegments()).concatMap(notificationWithLost -> {
                if (!notificationWithLost.isValue()) {
                    return Flowable.just(notificationWithLost);
                }
                R value = notificationWithLost.value();
                return Flowable.fromPublisher((Publisher) transformer.apply(Flowable.just(value))).map(obj -> {
                    return reuseBuilder.value(obj, notificationWithLost.valueSegment());
                }).concatWith(Single.just(keyCompleted.value(function.apply(value), notificationWithLost.valueSegment())));
            }).subscribe((FlowableSubscriber) this);
        }

        @Override // org.infinispan.reactive.publisher.impl.PublisherHandler.PublisherState
        PublisherResponse generateResponse(boolean z) {
            return new KeyPublisherResponse(this.results, this.completedSegments, this.lostSegments, this.pos, z, this.segmentResults == null ? Collections.emptyList() : this.segmentResults, this.extraValues, this.extraPos, this.keys, this.keyPos);
        }

        @Override // org.infinispan.reactive.publisher.impl.PublisherHandler.PublisherState, org.reactivestreams.Subscriber
        public void onNext(SegmentAwarePublisherSupplier.NotificationWithLost notificationWithLost) {
            if (!notificationWithLost.isValue()) {
                super.onNext(notificationWithLost);
                return;
            }
            boolean z = true;
            if (notificationWithLost instanceof KeyCompleted) {
                if (this.keyStartPosition != this.pos) {
                    Object value = notificationWithLost.value();
                    if (this.keys == null) {
                        this.keys = new Object[this.batchSize];
                    }
                    Object[] objArr = this.keys;
                    int i = this.keyPos;
                    this.keyPos = i + 1;
                    objArr[i] = value;
                    if (this.pos == this.results.length) {
                        prepareResponse(false);
                        z = false;
                    } else {
                        this.keyStartPosition = this.pos;
                    }
                }
                if (z) {
                    requestMore(this.upstream, 1);
                    return;
                }
                return;
            }
            int valueSegment = notificationWithLost.valueSegment();
            if (!$assertionsDisabled && this.currentSegment != valueSegment && this.currentSegment != -1) {
                throw new AssertionError();
            }
            this.currentSegment = valueSegment;
            this.segmentEntries++;
            Object value2 = notificationWithLost.value();
            if (this.pos != this.results.length) {
                Object[] objArr2 = this.results;
                int i2 = this.pos;
                this.pos = i2 + 1;
                objArr2[i2] = value2;
                if (this.pos == this.results.length) {
                    requestMore(this.upstream, 1);
                    return;
                }
                return;
            }
            if (this.extraValues == null) {
                this.extraValues = new Object[8];
            }
            if (this.extraPos == this.extraValues.length) {
                Object[] objArr3 = new Object[this.extraValues.length << 1];
                System.arraycopy(this.extraValues, 0, objArr3, 0, this.extraPos);
                this.extraValues = objArr3;
            }
            Object[] objArr4 = this.extraValues;
            int i3 = this.extraPos;
            this.extraPos = i3 + 1;
            objArr4[i3] = value2;
            requestMore(this.upstream, 1);
        }

        @Override // org.infinispan.reactive.publisher.impl.PublisherHandler.PublisherState
        void resetValues() {
            super.resetValues();
            keyResetValues();
        }

        void keyResetValues() {
            this.extraValues = null;
            this.extraPos = 0;
            this.keys = null;
            this.keyPos = 0;
            this.keyStartPosition = 0;
        }

        @Override // org.infinispan.reactive.publisher.impl.PublisherHandler.PublisherState
        public void segmentComplete(int i) {
            super.segmentComplete(i);
            this.keys = null;
            this.keyPos = 0;
            this.keyStartPosition = 0;
        }

        @Override // org.infinispan.reactive.publisher.impl.PublisherHandler.PublisherState
        public void segmentLost(int i) {
            super.segmentLost(i);
            keyResetValues();
        }

        static {
            $assertionsDisabled = !PublisherHandler.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/reactive/publisher/impl/PublisherHandler$PublisherState.class */
    public class PublisherState implements FlowableSubscriber<SegmentAwarePublisherSupplier.NotificationWithLost<Object>>, Runnable {
        final String requestId;
        final Address origin;
        final int batchSize;
        Subscription upstream;
        Object[] results;
        List<SegmentResult> segmentResults;
        int pos;
        IntSet completedSegments;
        IntSet lostSegments;
        int segmentEntries;
        volatile boolean complete;
        static final /* synthetic */ boolean $assertionsDisabled;

        @GuardedBy("this")
        private CompletableFuture<PublisherResponse> futureResponse = null;
        int currentSegment = -1;

        private PublisherState(String str, Address address, int i) {
            this.requestId = str;
            this.origin = address;
            this.batchSize = i;
            this.results = new Object[i];
        }

        void startProcessing(InitialPublisherCommand initialPublisherCommand) {
            Flowable.fromPublisher((initialPublisherCommand.isEntryStream() ? PublisherHandler.this.localPublisherManager.entryPublisher(initialPublisherCommand.getSegments(), initialPublisherCommand.getKeys(), initialPublisherCommand.getExcludedKeys(), initialPublisherCommand.getExplicitFlags(), initialPublisherCommand.getDeliveryGuarantee(), initialPublisherCommand.getTransformer()) : PublisherHandler.this.localPublisherManager.keyPublisher(initialPublisherCommand.getSegments(), initialPublisherCommand.getKeys(), initialPublisherCommand.getExcludedKeys(), initialPublisherCommand.getExplicitFlags(), initialPublisherCommand.getDeliveryGuarantee(), initialPublisherCommand.getTransformer())).publisherWithLostSegments(true)).subscribe((FlowableSubscriber) this);
        }

        @Override // io.reactivex.rxjava3.core.FlowableSubscriber, org.reactivestreams.Subscriber
        public void onSubscribe(Subscription subscription) {
            if (this.upstream != null) {
                throw new IllegalStateException("Subscription was already set!");
            }
            this.upstream = (Subscription) Objects.requireNonNull(subscription);
            requestMore(subscription, this.batchSize);
        }

        protected void requestMore(Subscription subscription, int i) {
            subscription.request(i);
        }

        @Override // org.reactivestreams.Subscriber
        public void onError(Throwable th) {
            this.complete = true;
            PublisherHandler.log.trace("Exception encountered while processing publisher", th);
            synchronized (this) {
                if (this.futureResponse == null) {
                    this.futureResponse = CompletableFuture.failedFuture(th);
                } else {
                    this.futureResponse.completeExceptionally(th);
                }
            }
        }

        @Override // org.reactivestreams.Subscriber
        public void onComplete() {
            prepareResponse(true);
            if (PublisherHandler.log.isTraceEnabled()) {
                PublisherHandler.log.tracef("Completed state for %s", this.requestId);
            }
        }

        @Override // org.reactivestreams.Subscriber
        public void onNext(SegmentAwarePublisherSupplier.NotificationWithLost notificationWithLost) {
            if (!notificationWithLost.isValue()) {
                if (notificationWithLost.isSegmentComplete()) {
                    int completedSegment = notificationWithLost.completedSegment();
                    if (this.segmentEntries > 0) {
                        addToSegmentResults(completedSegment, this.segmentEntries);
                    }
                    segmentComplete(completedSegment);
                } else {
                    segmentLost(notificationWithLost.lostSegment());
                }
                requestMore(this.upstream, 1);
                return;
            }
            int valueSegment = notificationWithLost.valueSegment();
            if (!$assertionsDisabled && this.currentSegment != valueSegment && this.currentSegment != -1) {
                throw new AssertionError();
            }
            this.currentSegment = valueSegment;
            this.segmentEntries++;
            Object[] objArr = this.results;
            int i = this.pos;
            this.pos = i + 1;
            objArr[i] = notificationWithLost.value();
            if (this.pos == this.results.length) {
                prepareResponse(false);
            }
        }

        public void segmentComplete(int i) {
            if (!$assertionsDisabled && this.currentSegment != i && this.currentSegment != -1) {
                throw new AssertionError();
            }
            if (PublisherHandler.log.isTraceEnabled()) {
                PublisherHandler.log.tracef("Completing segment %s for %s", i, (Object) this.requestId);
            }
            if (this.completedSegments == null) {
                this.completedSegments = IntSets.mutableEmptySet();
            }
            this.completedSegments.set(i);
            this.segmentEntries = 0;
            this.currentSegment = -1;
        }

        public void segmentLost(int i) {
            if (!$assertionsDisabled && this.currentSegment != i && this.currentSegment != -1) {
                throw new AssertionError();
            }
            if (PublisherHandler.log.isTraceEnabled()) {
                PublisherHandler.log.tracef("Lost segment %s for %s", i, (Object) this.requestId);
            }
            if (this.lostSegments == null) {
                this.lostSegments = IntSets.mutableEmptySet();
            }
            this.lostSegments.set(i);
            this.pos -= this.segmentEntries;
            this.segmentEntries = 0;
            this.currentSegment = -1;
        }

        public void cancel() {
            Subscription subscription = this.upstream;
            if (subscription != null) {
                subscription.cancel();
            }
        }

        void resetValues() {
            this.results = new Object[this.batchSize];
            this.segmentResults = null;
            this.completedSegments = null;
            this.lostSegments = null;
            this.pos = 0;
            this.currentSegment = -1;
            this.segmentEntries = 0;
        }

        PublisherResponse generateResponse(boolean z) {
            return new PublisherResponse(this.results, this.completedSegments, this.lostSegments, this.pos, z, this.segmentResults == null ? Collections.emptyList() : this.segmentResults);
        }

        void prepareResponse(boolean z) {
            if (this.currentSegment != -1) {
                addToSegmentResults(this.currentSegment, this.segmentEntries);
            }
            PublisherResponse generateResponse = generateResponse(z);
            if (PublisherHandler.log.isTraceEnabled()) {
                PublisherHandler.log.tracef("Response ready %s with id %s for requestor %s", generateResponse, this.requestId, this.origin);
            }
            if (!z) {
                resetValues();
            }
            this.complete = z;
            CompletableFuture<PublisherResponse> completableFuture = null;
            synchronized (this) {
                if (this.futureResponse != null) {
                    if (this.futureResponse.isDone()) {
                        if (!this.futureResponse.isCompletedExceptionally()) {
                            throw new IllegalStateException("Response already completed with " + String.valueOf(CompletionStages.join(this.futureResponse)) + " but we want to complete with " + String.valueOf(generateResponse));
                        }
                        PublisherHandler.log.tracef("Response %s already completed with an exception, ignoring values", System.identityHashCode(this.futureResponse));
                    }
                    completableFuture = this.futureResponse;
                    this.futureResponse = null;
                } else {
                    this.futureResponse = CompletableFuture.completedFuture(generateResponse);
                    if (PublisherHandler.log.isTraceEnabled()) {
                        PublisherHandler.log.tracef("Eager response completed %d for request id %s", System.identityHashCode(this.futureResponse), (Object) this.requestId);
                    }
                }
            }
            if (completableFuture != null) {
                if (PublisherHandler.log.isTraceEnabled()) {
                    PublisherHandler.log.tracef("Completing waiting future %d for request id %s", System.identityHashCode(completableFuture), (Object) this.requestId);
                }
                completableFuture.complete(generateResponse);
            }
        }

        public Address getOrigin() {
            return this.origin;
        }

        CompletableFuture<PublisherResponse> results() {
            CompletableFuture<PublisherResponse> completableFuture;
            boolean z = false;
            synchronized (this) {
                if (this.futureResponse == null) {
                    completableFuture = new CompletableFuture<>();
                    completableFuture.thenRunAsync((Runnable) this, (Executor) PublisherHandler.this.nonBlockingExecutor);
                    this.futureResponse = completableFuture;
                } else {
                    completableFuture = this.futureResponse;
                    this.futureResponse = null;
                    z = true;
                }
            }
            if (z) {
                PublisherHandler.this.nonBlockingExecutor.execute(this);
            }
            if (PublisherHandler.log.isTraceEnabled()) {
                PublisherHandler.log.tracef("Retrieved future %d for request id %s", System.identityHashCode(completableFuture), (Object) this.requestId);
            }
            return completableFuture;
        }

        void addToSegmentResults(int i, int i2) {
            if (this.segmentResults == null) {
                this.segmentResults = new ArrayList();
            }
            this.segmentResults.add(new SegmentResult(i, i2));
        }

        @Override // java.lang.Runnable
        public void run() {
            if (PublisherHandler.log.isTraceEnabled()) {
                PublisherHandler.log.tracef("Running handler for request id %s", this.requestId);
            }
            if (!this.complete) {
                int i = this.batchSize;
                if (PublisherHandler.log.isTraceEnabled()) {
                    PublisherHandler.log.tracef("Requesting %d additional entries for %s", i, (Object) this.requestId);
                }
                requestMore(this.upstream, i);
                return;
            }
            synchronized (this) {
                if (this.futureResponse == null) {
                    PublisherHandler.this.closePublisher(this.requestId, this);
                } else if (PublisherHandler.log.isTraceEnabled()) {
                    PublisherHandler.log.tracef("Skipping run as handler is complete, but still has some results for id %s", this.requestId);
                }
            }
        }

        static {
            $assertionsDisabled = !PublisherHandler.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:org/infinispan/reactive/publisher/impl/PublisherHandler$SegmentResult.class */
    public static class SegmentResult {
        private final int segment;
        private final int entryCount;

        public SegmentResult(int i, int i2) {
            this.segment = i;
            this.entryCount = i2;
        }

        public int getEntryCount() {
            return this.entryCount;
        }

        public int getSegment() {
            return this.segment;
        }

        public String toString() {
            return "SegmentResult{segment=" + this.segment + ", entryCount=" + this.entryCount + "}";
        }
    }

    @ViewChanged
    public void viewChange(ViewChangedEvent viewChangedEvent) {
        List<Address> newMembers = viewChangedEvent.getNewMembers();
        Iterator<PublisherState> it = this.currentRequests.values().iterator();
        while (it.hasNext()) {
            PublisherState next = it.next();
            Address origin = next.getOrigin();
            if (origin != null && !newMembers.contains(origin)) {
                log.tracef("View changed and no longer contains %s, closing %s publisher", origin, next.requestId);
                next.cancel();
                it.remove();
            }
        }
    }

    @Start
    public void start() {
        this.managerNotifier.addListener(this);
    }

    @Stop
    public void stop() {
        this.managerNotifier.removeListener(this);
    }

    public <I, R> CompletableFuture<PublisherResponse> register(InitialPublisherCommand<?, I, R> initialPublisherCommand) {
        String requestId = initialPublisherCommand.getRequestId();
        PublisherState keyPublisherState = initialPublisherCommand.isTrackKeys() ? new KeyPublisherState(requestId, initialPublisherCommand.getOrigin(), initialPublisherCommand.getBatchSize()) : new PublisherState(requestId, initialPublisherCommand.getOrigin(), initialPublisherCommand.getBatchSize());
        PublisherState put = this.currentRequests.put(requestId, keyPublisherState);
        if (put != null) {
            if (!put.complete) {
                this.currentRequests.remove(requestId);
                throw new IllegalStateException("There was already a publisher registered for id " + requestId + " that wasn't complete!");
            }
            if (log.isTraceEnabled()) {
                log.tracef("Closing prior state for %s to make room for a new request", requestId);
            }
            put.cancel();
        }
        keyPublisherState.startProcessing(initialPublisherCommand);
        return keyPublisherState.results();
    }

    public CompletableFuture<PublisherResponse> getNext(String str) {
        PublisherState publisherState = this.currentRequests.get(str);
        if (publisherState == null) {
            throw new IllegalStateException("Publisher for requestId " + str + " doesn't exist!");
        }
        return publisherState.results();
    }

    public int openPublishers() {
        return this.currentRequests.size();
    }

    public void closePublisher(String str) {
        PublisherState remove = this.currentRequests.remove(str);
        if (remove != null) {
            if (log.isTraceEnabled()) {
                log.tracef("Closed publisher using requestId %s", str);
            }
            remove.cancel();
        }
    }

    private void closePublisher(String str, PublisherState publisherState) {
        if (this.currentRequests.remove(str, publisherState)) {
            if (log.isTraceEnabled()) {
                log.tracef("Closed publisher from completion using requestId %s", str);
            }
            publisherState.cancel();
        } else if (log.isTraceEnabled()) {
            log.tracef("A concurrent request already closed the prior state for %s", str);
        }
    }
}
