package io.scalecube.services.transport.rsocket;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import io.rsocket.util.ByteBufPayload;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.auth.Authenticator;
import io.scalecube.services.auth.ConnectionSetup;
import io.scalecube.services.auth.ConnectionSetupCodec;
import io.scalecube.services.exceptions.BadRequestException;
import io.scalecube.services.exceptions.MessageCodecException;
import io.scalecube.services.exceptions.ServiceException;
import io.scalecube.services.exceptions.ServiceUnavailableException;
import io.scalecube.services.exceptions.UnauthorizedException;
import io.scalecube.services.methods.ServiceMethodInvoker;
import io.scalecube.services.methods.ServiceMethodRegistry;
import io.scalecube.services.transport.api.DataCodec;
import io.scalecube.services.transport.api.HeadersCodec;
import io.scalecube.services.transport.api.ReferenceCountUtil;
import io.scalecube.services.transport.api.ServiceMessageCodec;
import java.util.Collection;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

/* loaded from: input_file:io/scalecube/services/transport/rsocket/RSocketServiceAcceptor.class */
public class RSocketServiceAcceptor implements SocketAcceptor {
    private static final Logger LOGGER = LoggerFactory.getLogger(RSocketServiceAcceptor.class);
    private final ConnectionSetupCodec connectionSetupCodec;
    private final HeadersCodec headersCodec;
    private final Collection<DataCodec> dataCodecs;
    private final Authenticator<Object> authenticator;
    private final ServiceMethodRegistry methodRegistry;

    /* loaded from: input_file:io/scalecube/services/transport/rsocket/RSocketServiceAcceptor$RSocketImpl.class */
    private static class RSocketImpl implements RSocket {
        private final Object authData;
        private final ServiceMessageCodec messageCodec;
        private final ServiceMethodRegistry methodRegistry;

        private RSocketImpl(@Nullable Object obj, ServiceMessageCodec serviceMessageCodec, ServiceMethodRegistry serviceMethodRegistry) {
            this.authData = obj;
            this.messageCodec = serviceMessageCodec;
            this.methodRegistry = serviceMethodRegistry;
        }

        public Mono<Payload> requestResponse(Payload payload) {
            return Mono.deferWithContext(context -> {
                return Mono.just(toMessage(payload));
            }).doOnNext(this::validateRequest).flatMap(serviceMessage -> {
                ServiceMethodInvoker invoker = this.methodRegistry.getInvoker(serviceMessage.qualifier());
                validateMethodInvoker(invoker, serviceMessage);
                return invoker.invokeOne(serviceMessage).doOnNext(serviceMessage -> {
                    releaseRequestOnError(serviceMessage, serviceMessage);
                });
            }).map(this::toPayload).doOnError(th -> {
                RSocketServiceAcceptor.LOGGER.error("[requestResponse] Exception occurred: {}", th.toString());
            }).subscriberContext(this::enhanceContextWithAuthData);
        }

        public Flux<Payload> requestStream(Payload payload) {
            return Mono.deferWithContext(context -> {
                return Mono.just(toMessage(payload));
            }).doOnNext(this::validateRequest).flatMapMany(serviceMessage -> {
                ServiceMethodInvoker invoker = this.methodRegistry.getInvoker(serviceMessage.qualifier());
                validateMethodInvoker(invoker, serviceMessage);
                return invoker.invokeMany(serviceMessage).doOnNext(serviceMessage -> {
                    releaseRequestOnError(serviceMessage, serviceMessage);
                });
            }).map(this::toPayload).doOnError(th -> {
                RSocketServiceAcceptor.LOGGER.error("[requestStream] Exception occurred: {}", th.toString());
            }).subscriberContext(this::enhanceContextWithAuthData);
        }

        public Flux<Payload> requestChannel(Publisher<Payload> publisher) {
            return Flux.deferWithContext(context -> {
                return Flux.from(publisher);
            }).map(this::toMessage).switchOnFirst((signal, flux) -> {
                if (!signal.hasValue()) {
                    return flux;
                }
                ServiceMessage serviceMessage = (ServiceMessage) signal.get();
                validateRequest(serviceMessage);
                return this.methodRegistry.getInvoker(serviceMessage.qualifier()).invokeBidirectional(flux).doOnNext(serviceMessage2 -> {
                    releaseRequestOnError(serviceMessage, serviceMessage2);
                });
            }).map(this::toPayload).doOnError(th -> {
                RSocketServiceAcceptor.LOGGER.error("[requestChannel] Exception occurred: {}", th.toString());
            }).subscriberContext(this::enhanceContextWithAuthData);
        }

        private Payload toPayload(ServiceMessage serviceMessage) {
            return (Payload) this.messageCodec.encodeAndTransform(serviceMessage, ByteBufPayload::create);
        }

        private ServiceMessage toMessage(Payload payload) {
            try {
                return this.messageCodec.decode(payload.sliceData().retain(), payload.sliceMetadata().retain());
            } finally {
                payload.release();
            }
        }

        private Context enhanceContextWithAuthData(Context context) {
            return this.authData != null ? context.put("auth.context", this.authData) : context;
        }

        private void validateRequest(ServiceMessage serviceMessage) throws ServiceException {
            if (serviceMessage.qualifier() == null) {
                releaseRequest(serviceMessage);
                RSocketServiceAcceptor.LOGGER.error("[qualifier is null] Invocation failed for {}", serviceMessage);
                throw new BadRequestException("Qualifier is null");
            }
        }

        private void validateMethodInvoker(ServiceMethodInvoker serviceMethodInvoker, ServiceMessage serviceMessage) {
            if (serviceMethodInvoker == null) {
                releaseRequest(serviceMessage);
                RSocketServiceAcceptor.LOGGER.error("[no service invoker found] Invocation failed for {}", serviceMessage);
                throw new ServiceUnavailableException("No service invoker found");
            }
        }

        private void releaseRequest(ServiceMessage serviceMessage) {
            ReferenceCountUtil.safestRelease(serviceMessage.data());
        }

        private void releaseRequestOnError(ServiceMessage serviceMessage, ServiceMessage serviceMessage2) {
            if (serviceMessage2.isError()) {
                releaseRequest(serviceMessage);
            }
        }
    }

    public RSocketServiceAcceptor(ConnectionSetupCodec connectionSetupCodec, HeadersCodec headersCodec, Collection<DataCodec> collection, Authenticator<Object> authenticator, ServiceMethodRegistry serviceMethodRegistry) {
        this.connectionSetupCodec = connectionSetupCodec;
        this.headersCodec = headersCodec;
        this.dataCodecs = collection;
        this.authenticator = authenticator;
        this.methodRegistry = serviceMethodRegistry;
    }

    public Mono<RSocket> accept(ConnectionSetupPayload connectionSetupPayload, RSocket rSocket) {
        LOGGER.info("[rsocket][accept][{}] Setup: {}", rSocket, connectionSetupPayload);
        return Mono.justOrEmpty(decodeConnectionSetup(connectionSetupPayload.data())).flatMap(connectionSetup -> {
            return authenticate(rSocket, connectionSetup);
        }).flatMap(obj -> {
            return Mono.fromCallable(() -> {
                return new RSocketImpl(obj, new ServiceMessageCodec(this.headersCodec, this.dataCodecs), this.methodRegistry);
            });
        }).switchIfEmpty(Mono.fromCallable(() -> {
            return new RSocketImpl(null, new ServiceMessageCodec(this.headersCodec, this.dataCodecs), this.methodRegistry);
        })).cast(RSocket.class);
    }

    private ConnectionSetup decodeConnectionSetup(ByteBuf byteBuf) {
        if (!byteBuf.isReadable()) {
            return null;
        }
        try {
            ByteBufInputStream byteBufInputStream = new ByteBufInputStream(byteBuf, false);
            Throwable th = null;
            try {
                try {
                    ConnectionSetup decode = this.connectionSetupCodec.decode(byteBufInputStream);
                    if (byteBufInputStream != null) {
                        if (0 != 0) {
                            try {
                                byteBufInputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            byteBufInputStream.close();
                        }
                    }
                    return decode;
                } finally {
                }
            } finally {
            }
        } catch (Throwable th3) {
            ReferenceCountUtil.safestRelease(byteBuf);
            throw new MessageCodecException("Failed to decode ConnectionSetup", th3);
        }
    }

    private Mono<Object> authenticate(RSocket rSocket, ConnectionSetup connectionSetup) {
        return (this.authenticator == null || connectionSetup == null || !connectionSetup.hasCredentials()) ? Mono.empty() : ((Mono) this.authenticator.apply(connectionSetup.credentials())).doOnSuccess(obj -> {
            LOGGER.debug("[rsocket][authenticate][{}] Authenticated", rSocket);
        }).doOnError(th -> {
            LOGGER.error("[rsocket][authenticate][{}] Exception occurred: {}", rSocket, th.toString());
        }).onErrorMap(this::toUnauthorizedException);
    }

    private UnauthorizedException toUnauthorizedException(Throwable th) {
        if (!(th instanceof ServiceException)) {
            return new UnauthorizedException(th);
        }
        ServiceException serviceException = (ServiceException) th;
        return new UnauthorizedException(serviceException.errorCode(), serviceException.getMessage());
    }
}
