/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsarmetadatastoreoxia.shaded.com.salesforce.reactorgrpc.stub;

import io.streamnative.pulsarmetadatastoreoxia.shaded.com.google.common.base.Preconditions;
import io.streamnative.pulsarmetadatastoreoxia.shaded.com.salesforce.reactorgrpc.stub.ReactorCallOptions;
import io.streamnative.pulsarmetadatastoreoxia.shaded.com.salesforce.reactorgrpc.stub.ReactorServerStreamObserverAndPublisher;
import io.streamnative.pulsarmetadatastoreoxia.shaded.com.salesforce.reactorgrpc.stub.ReactorSubscriberAndServerProducer;
import io.streamnative.pulsarmetadatastoreoxia.shaded.io.grpc.CallOptions;
import io.streamnative.pulsarmetadatastoreoxia.shaded.io.grpc.Status;
import io.streamnative.pulsarmetadatastoreoxia.shaded.io.grpc.StatusException;
import io.streamnative.pulsarmetadatastoreoxia.shaded.io.grpc.StatusRuntimeException;
import io.streamnative.pulsarmetadatastoreoxia.shaded.io.grpc.stub.ServerCallStreamObserver;
import io.streamnative.pulsarmetadatastoreoxia.shaded.io.grpc.stub.StreamObserver;
import io.streamnative.pulsarmetadatastoreoxia.shaded.reactor.core.Disposable;
import io.streamnative.pulsarmetadatastoreoxia.shaded.reactor.core.publisher.Flux;
import io.streamnative.pulsarmetadatastoreoxia.shaded.reactor.core.publisher.Mono;
import java.util.function.Function;

public final class ServerCalls {
    private ServerCalls() {
    }

    public static <TRequest, TResponse> void oneToOne(TRequest request, StreamObserver<TResponse> responseObserver, Function<TRequest, Mono<TResponse>> delegate, Function<Throwable, Throwable> prepareError) {
        try {
            Mono<TResponse> rxResponse = Preconditions.checkNotNull(delegate.apply(request));
            Disposable subscription = rxResponse.subscribe(value -> {
                if (responseObserver instanceof ServerCallStreamObserver && ((ServerCallStreamObserver)responseObserver).isCancelled()) {
                    return;
                }
                responseObserver.onNext(value);
            }, throwable -> responseObserver.onError((Throwable)prepareError.apply((Throwable)throwable)), responseObserver::onCompleted);
            ServerCalls.cancelSubscriptionOnCallEnd(subscription, (ServerCallStreamObserver)responseObserver);
        }
        catch (Throwable throwable2) {
            responseObserver.onError(prepareError.apply(throwable2));
        }
    }

    public static <TRequest, TResponse> void oneToMany(TRequest request, StreamObserver<TResponse> responseObserver, Function<TRequest, Flux<TResponse>> delegate, Function<Throwable, Throwable> prepareError) {
        try {
            Flux<TResponse> rxResponse = Preconditions.checkNotNull(delegate.apply(request));
            ReactorSubscriberAndServerProducer server = rxResponse.subscribeWith(new ReactorSubscriberAndServerProducer(prepareError::apply));
            server.subscribe((ServerCallStreamObserver)responseObserver);
        }
        catch (Throwable throwable) {
            responseObserver.onError(prepareError.apply(throwable));
        }
    }

    public static <TRequest, TResponse> StreamObserver<TRequest> manyToOne(StreamObserver<TResponse> responseObserver, Function<Flux<TRequest>, Mono<TResponse>> delegate, Function<Throwable, Throwable> prepareError, CallOptions options) {
        int prefetch = ReactorCallOptions.getPrefetch(options);
        int lowTide = ReactorCallOptions.getLowTide(options);
        ReactorServerStreamObserverAndPublisher streamObserverPublisher = new ReactorServerStreamObserverAndPublisher((ServerCallStreamObserver)responseObserver, null, prefetch, lowTide);
        try {
            Mono<TResponse> rxResponse = Preconditions.checkNotNull(delegate.apply(Flux.from(streamObserverPublisher)));
            Disposable subscription = rxResponse.subscribe(value -> {
                if (!streamObserverPublisher.isCancelled()) {
                    responseObserver.onNext(value);
                }
            }, throwable -> {
                if (!streamObserverPublisher.isCancelled()) {
                    streamObserverPublisher.abortPendingCancel();
                    responseObserver.onError((Throwable)prepareError.apply((Throwable)throwable));
                }
            }, responseObserver::onCompleted);
            ServerCalls.cancelSubscriptionOnCallEnd(subscription, (ServerCallStreamObserver)responseObserver);
        }
        catch (Throwable throwable2) {
            responseObserver.onError(prepareError.apply(throwable2));
        }
        return streamObserverPublisher;
    }

    public static <TRequest, TResponse> StreamObserver<TRequest> manyToMany(StreamObserver<TResponse> responseObserver, Function<Flux<TRequest>, Flux<TResponse>> delegate, Function<Throwable, Throwable> prepareError, CallOptions options) {
        int prefetch = ReactorCallOptions.getPrefetch(options);
        int lowTide = ReactorCallOptions.getLowTide(options);
        ReactorServerStreamObserverAndPublisher streamObserverPublisher = new ReactorServerStreamObserverAndPublisher((ServerCallStreamObserver)responseObserver, null, prefetch, lowTide);
        try {
            Flux<TResponse> rxResponse = Preconditions.checkNotNull(delegate.apply(Flux.from(streamObserverPublisher)));
            ReactorSubscriberAndServerProducer subscriber = new ReactorSubscriberAndServerProducer(prepareError::apply);
            subscriber.subscribe((ServerCallStreamObserver)responseObserver);
            rxResponse.subscribe(subscriber);
        }
        catch (Throwable throwable) {
            responseObserver.onError(prepareError.apply(throwable));
        }
        return streamObserverPublisher;
    }

    public static Throwable prepareError(Throwable throwable) {
        if (throwable instanceof StatusException || throwable instanceof StatusRuntimeException) {
            return throwable;
        }
        return Status.fromThrowable(throwable).asException();
    }

    private static void cancelSubscriptionOnCallEnd(Disposable subscription, ServerCallStreamObserver<?> responseObserver) {
        responseObserver.setOnCancelHandler(subscription::dispose);
        responseObserver.setOnCloseHandler(subscription::dispose);
    }
}

