package io.scalecube.services.transport.rsocket;

import io.rsocket.core.RSocketServer;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.scalecube.net.Address;
import io.scalecube.services.methods.ServiceMethodRegistry;
import io.scalecube.services.transport.api.ServerTransport;
import io.scalecube.services.transport.api.ServiceMessageCodec;
import java.net.InetSocketAddress;
import java.util.StringJoiner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.netty.tcp.TcpServer;

/* loaded from: input_file:io/scalecube/services/transport/rsocket/RSocketServerTransport.class */
public class RSocketServerTransport implements ServerTransport {
    private static final Logger LOGGER = LoggerFactory.getLogger(RSocketServerTransport.class);
    private final ServiceMessageCodec messageCodec;
    private final TcpServer tcpServer;
    private CloseableChannel serverChannel;

    public RSocketServerTransport(ServiceMessageCodec serviceMessageCodec, TcpServer tcpServer) {
        this.messageCodec = serviceMessageCodec;
        this.tcpServer = tcpServer;
    }

    public Address address() {
        InetSocketAddress address = this.serverChannel.address();
        return Address.create(address.getHostString(), address.getPort());
    }

    public Mono<ServerTransport> bind(ServiceMethodRegistry serviceMethodRegistry) {
        return Mono.defer(() -> {
            return RSocketServer.create().acceptor(new RSocketServiceAcceptor(this.messageCodec, serviceMethodRegistry)).payloadDecoder(PayloadDecoder.DEFAULT).bind(TcpServerTransport.create(this.tcpServer.doOnConnection(connection -> {
                LOGGER.debug("Accepted connection on {}", connection.channel());
                connection.onDispose(() -> {
                    LOGGER.debug("Connection closed on {}", connection.channel());
                });
            }))).doOnSuccess(closeableChannel -> {
                this.serverChannel = closeableChannel;
            }).thenReturn(this);
        });
    }

    public Mono<Void> stop() {
        return Mono.defer(() -> {
            if (this.serverChannel == null) {
                return Mono.empty();
            }
            this.serverChannel.dispose();
            return this.serverChannel.onClose().doOnError(th -> {
                LOGGER.warn("Failed to close server: " + th);
            });
        });
    }

    public String toString() {
        return new StringJoiner(", ", RSocketServerTransport.class.getSimpleName() + "[", "]").add("messageCodec=" + this.messageCodec).add("tcpServer=" + this.tcpServer).add("serverChannel=" + this.serverChannel).toString();
    }
}
