package io.scalecube.services.transport.rsocket;

import io.netty.buffer.ByteBuf;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import io.scalecube.services.auth.Authenticator;
import io.scalecube.services.auth.Principal;
import io.scalecube.services.exceptions.UnauthorizedException;
import io.scalecube.services.registry.api.ServiceRegistry;
import io.scalecube.services.transport.api.DataCodec;
import io.scalecube.services.transport.api.HeadersCodec;
import java.util.Collection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/scalecube/services/transport/rsocket/RSocketServiceAcceptor.class */
public class RSocketServiceAcceptor implements SocketAcceptor {
    private static final Logger LOGGER = LoggerFactory.getLogger(RSocketServiceAcceptor.class);
    private final HeadersCodec headersCodec;
    private final Collection<DataCodec> dataCodecs;
    private final Authenticator authenticator;
    private final ServiceRegistry serviceRegistry;

    public RSocketServiceAcceptor(HeadersCodec headersCodec, Collection<DataCodec> collection, Authenticator authenticator, ServiceRegistry serviceRegistry) {
        this.headersCodec = headersCodec;
        this.dataCodecs = collection;
        this.authenticator = authenticator;
        this.serviceRegistry = serviceRegistry;
    }

    public Mono<RSocket> accept(ConnectionSetupPayload connectionSetupPayload, RSocket rSocket) {
        return Mono.defer(() -> {
            return authenticate(connectionSetupPayload.data());
        }).map(this::newRSocket);
    }

    private Mono<Principal> authenticate(ByteBuf byteBuf) {
        if (this.authenticator == null || !byteBuf.isReadable()) {
            return Mono.just(Principal.NULL_PRINCIPAL);
        }
        byte[] bArr = new byte[byteBuf.readableBytes()];
        byteBuf.getBytes(byteBuf.readerIndex(), bArr);
        return Mono.defer(() -> {
            return this.authenticator.authenticate(bArr);
        }).switchIfEmpty(Mono.just(Principal.NULL_PRINCIPAL)).doOnSuccess(principal -> {
            LOGGER.debug("Authenticated successfully: {}", principal);
        }).doOnError(th -> {
            LOGGER.error("Authentication failed", th);
        }).onErrorMap(th2 -> {
            return new UnauthorizedException("Authentication failed");
        });
    }

    private RSocket newRSocket(Principal principal) {
        return new RSocketImpl(principal, new ServiceMessageCodec(this.headersCodec, this.dataCodecs), this.serviceRegistry);
    }
}
