package io.scalecube.services.transport.rsocket.server;

import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.server.NettyContextCloseable;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.ByteBufPayload;
import io.scalecube.services.codec.ServiceMessageCodec;
import io.scalecube.services.methods.ServiceMethodRegistry;
import io.scalecube.services.transport.server.api.ServerTransport;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.NettyContext;
import reactor.ipc.netty.options.ServerOptions;
import reactor.ipc.netty.tcp.TcpServer;

/* loaded from: input_file:io/scalecube/services/transport/rsocket/server/RSocketServerTransport.class */
public class RSocketServerTransport implements ServerTransport {
    private static final Logger LOGGER = LoggerFactory.getLogger(RSocketServerTransport.class);
    private final ServiceMessageCodec codec;
    private NettyContextCloseable server;
    private List<NettyContext> channels = new CopyOnWriteArrayList();

    public RSocketServerTransport(ServiceMessageCodec serviceMessageCodec) {
        this.codec = serviceMessageCodec;
    }

    @Override // io.scalecube.services.transport.server.api.ServerTransport
    public InetSocketAddress bindAwait(InetSocketAddress inetSocketAddress, ServiceMethodRegistry serviceMethodRegistry) {
        this.server = (NettyContextCloseable) RSocketFactory.receive().frameDecoder(frame -> {
            return ByteBufPayload.create(frame.sliceData().retain(), frame.sliceMetadata().retain());
        }).acceptor(new RSocketServiceAcceptor(this.codec, serviceMethodRegistry)).transport(TcpServerTransport.create(TcpServer.create((Consumer<? super ServerOptions.Builder<?>>) builder -> {
        }))).start().block();
        return this.server.address();
    }

    @Override // io.scalecube.services.transport.server.api.ServerTransport
    public Mono<Void> stop() {
        if (this.server == null) {
            return Mono.empty();
        }
        this.server.dispose();
        return this.server.onClose().then(Mono.when((List) this.channels.stream().map(nettyContext -> {
            nettyContext.dispose();
            return nettyContext.onClose();
        }).collect(Collectors.toList())));
    }
}
