package io.scalecube.services.transport.rsocket;

import io.rsocket.AbstractRSocket;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import io.rsocket.util.ByteBufPayload;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.exceptions.BadRequestException;
import io.scalecube.services.exceptions.ServiceException;
import io.scalecube.services.exceptions.ServiceUnavailableException;
import io.scalecube.services.methods.ServiceMethodRegistry;
import io.scalecube.services.transport.api.ReferenceCountUtil;
import io.scalecube.services.transport.api.ServiceMessageCodec;
import java.util.Optional;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/scalecube/services/transport/rsocket/RSocketServiceAcceptor.class */
public class RSocketServiceAcceptor implements SocketAcceptor {
    private static final Logger LOGGER = LoggerFactory.getLogger(RSocketServiceAcceptor.class);
    private final ServiceMessageCodec messageCodec;
    private final ServiceMethodRegistry methodRegistry;

    /* loaded from: input_file:io/scalecube/services/transport/rsocket/RSocketServiceAcceptor$AbstractRSocket0.class */
    private class AbstractRSocket0 extends AbstractRSocket {
        private AbstractRSocket0() {
        }

        public Mono<Payload> requestResponse(Payload payload) {
            return Mono.fromCallable(() -> {
                return toMessage(payload);
            }).doOnNext(this::validateRequest).flatMap(serviceMessage -> {
                return RSocketServiceAcceptor.this.methodRegistry.getInvoker(serviceMessage.qualifier()).invokeOne(serviceMessage, ServiceMessageCodec::decodeData);
            }).map(this::toPayload);
        }

        public Flux<Payload> requestStream(Payload payload) {
            return Mono.fromCallable(() -> {
                return toMessage(payload);
            }).doOnNext(this::validateRequest).flatMapMany(serviceMessage -> {
                return RSocketServiceAcceptor.this.methodRegistry.getInvoker(serviceMessage.qualifier()).invokeMany(serviceMessage, ServiceMessageCodec::decodeData);
            }).map(this::toPayload);
        }

        public Flux<Payload> requestChannel(Publisher<Payload> publisher) {
            return Flux.from(publisher).map(this::toMessage).switchOnFirst((signal, flux) -> {
                if (!signal.hasValue()) {
                    return flux;
                }
                ServiceMessage serviceMessage = (ServiceMessage) signal.get();
                validateRequest(serviceMessage);
                return RSocketServiceAcceptor.this.methodRegistry.getInvoker(serviceMessage.qualifier()).invokeBidirectional(flux, ServiceMessageCodec::decodeData);
            }).map(this::toPayload);
        }

        private Payload toPayload(ServiceMessage serviceMessage) {
            return (Payload) RSocketServiceAcceptor.this.messageCodec.encodeAndTransform(serviceMessage, ByteBufPayload::create);
        }

        private ServiceMessage toMessage(Payload payload) {
            return RSocketServiceAcceptor.this.messageCodec.decode(payload.sliceData(), payload.sliceMetadata());
        }

        private void validateRequest(ServiceMessage serviceMessage) throws ServiceException {
            if (serviceMessage.qualifier() == null) {
                Optional.ofNullable(serviceMessage.data()).ifPresent(ReferenceCountUtil::safestRelease);
                RSocketServiceAcceptor.LOGGER.error("Failed to invoke service with msg={}: qualifier is null", serviceMessage);
                throw new BadRequestException("Qualifier is null in service msg request: " + serviceMessage);
            }
            if (RSocketServiceAcceptor.this.methodRegistry.containsInvoker(serviceMessage.qualifier())) {
                return;
            }
            Optional.ofNullable(serviceMessage.data()).ifPresent(ReferenceCountUtil::safestRelease);
            RSocketServiceAcceptor.LOGGER.error("Failed to invoke service with msg={}: no service invoker found by qualifier={}", serviceMessage, serviceMessage.qualifier());
            throw new ServiceUnavailableException("No service invoker found by qualifier=" + serviceMessage.qualifier());
        }
    }

    public RSocketServiceAcceptor(ServiceMessageCodec serviceMessageCodec, ServiceMethodRegistry serviceMethodRegistry) {
        this.messageCodec = serviceMessageCodec;
        this.methodRegistry = serviceMethodRegistry;
    }

    public Mono<RSocket> accept(ConnectionSetupPayload connectionSetupPayload, RSocket rSocket) {
        LOGGER.info("Accepted rSocket: {}, connectionSetup: {}", rSocket, connectionSetupPayload);
        return Mono.just(new AbstractRSocket0());
    }
}
