package io.scalecube.services.transport.rsocket;

import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.util.ByteBufPayload;
import io.scalecube.services.RequestContext;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.auth.Principal;
import io.scalecube.services.exceptions.ServiceUnavailableException;
import io.scalecube.services.methods.ServiceMethodInvoker;
import io.scalecube.services.registry.api.ServiceRegistry;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/scalecube/services/transport/rsocket/RSocketImpl.class */
public class RSocketImpl implements RSocket {
    private static final Logger LOGGER = LoggerFactory.getLogger(RSocketImpl.class);
    private final Principal principal;
    private final ServiceMessageCodec messageCodec;
    private final ServiceRegistry serviceRegistry;

    public RSocketImpl(Principal principal, ServiceMessageCodec serviceMessageCodec, ServiceRegistry serviceRegistry) {
        this.principal = principal;
        this.messageCodec = serviceMessageCodec;
        this.serviceRegistry = serviceRegistry;
    }

    public Mono<Payload> requestResponse(Payload payload) {
        return Mono.defer(() -> {
            ServiceMessage message = toMessage(payload);
            return lookupInvoker(message).flatMap(serviceMethodInvoker -> {
                return serviceMethodInvoker.invokeOne(message).doOnNext(serviceMessage -> {
                    releaseOnError(message, serviceMessage);
                }).contextWrite(requestContext(message));
            });
        }).map(this::toPayload).doOnError(th -> {
            LOGGER.error("[requestResponse] Exception occurred", th);
        });
    }

    public Flux<Payload> requestStream(Payload payload) {
        return Flux.defer(() -> {
            ServiceMessage message = toMessage(payload);
            return lookupInvoker(message).flatMapMany(serviceMethodInvoker -> {
                return serviceMethodInvoker.invokeMany(message).doOnNext(serviceMessage -> {
                    releaseOnError(message, serviceMessage);
                }).contextWrite(requestContext(message));
            });
        }).map(this::toPayload).doOnError(th -> {
            LOGGER.error("[requestStream] Exception occurred", th);
        });
    }

    public Flux<Payload> requestChannel(Publisher<Payload> publisher) {
        return Flux.from(publisher).map(this::toMessage).switchOnFirst((signal, flux) -> {
            if (!signal.hasValue()) {
                return flux;
            }
            ServiceMessage serviceMessage = (ServiceMessage) signal.get();
            return lookupInvoker(serviceMessage).flatMapMany(serviceMethodInvoker -> {
                return serviceMethodInvoker.invokeBidirectional(flux).doOnNext(serviceMessage2 -> {
                    releaseOnError(serviceMessage, serviceMessage2);
                }).contextWrite(requestContext(serviceMessage));
            });
        }).map(this::toPayload).doOnError(th -> {
            LOGGER.error("[requestChannel] Exception occurred", th);
        });
    }

    private Mono<ServiceMethodInvoker> lookupInvoker(ServiceMessage serviceMessage) {
        return Mono.fromCallable(() -> {
            ServiceMethodInvoker lookupInvoker = this.serviceRegistry.lookupInvoker(serviceMessage);
            if (lookupInvoker == null) {
                throw new ServiceUnavailableException("No service invoker found");
            }
            return lookupInvoker;
        }).doOnError(th -> {
            ReferenceCountUtil.safestRelease(serviceMessage.data());
        });
    }

    private Payload toPayload(ServiceMessage serviceMessage) {
        return (Payload) this.messageCodec.encodeAndTransform(serviceMessage, ByteBufPayload::create);
    }

    private ServiceMessage toMessage(Payload payload) {
        try {
            return this.messageCodec.decode(payload.sliceData().retain(), payload.sliceMetadata().retain());
        } finally {
            payload.release();
        }
    }

    private RequestContext requestContext(ServiceMessage serviceMessage) {
        return new RequestContext().headers(serviceMessage.headers()).principal(this.principal);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void releaseOnError(ServiceMessage serviceMessage, ServiceMessage serviceMessage2) {
        if (serviceMessage2.isError()) {
            ReferenceCountUtil.safestRelease(serviceMessage.data());
        }
    }
}
