package io.rsocket.transport.netty.server;

import io.rsocket.DuplexConnection;
import io.rsocket.transport.ServerTransport;
import io.rsocket.transport.netty.RSocketLengthCodec;
import io.rsocket.transport.netty.TcpDuplexConnection;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.tcp.TcpServer;

/* loaded from: input_file:io/rsocket/transport/netty/server/ExtendedTcpServerTransport.class */
public class ExtendedTcpServerTransport implements ServerTransport<NettyContextCloseable> {
    private final TcpServer server;

    public ExtendedTcpServerTransport(TcpServer tcpServer) {
        this.server = tcpServer;
    }

    @Override // io.rsocket.transport.ServerTransport
    public Mono<NettyContextCloseable> start(ServerTransport.ConnectionAcceptor connectionAcceptor) {
        return this.server.newHandler((nettyInbound, nettyOutbound) -> {
            nettyInbound.context().addHandler(new RSocketLengthCodec());
            nettyOutbound.options((v0) -> {
                v0.flushOnEach();
            });
            connectionAcceptor.apply((DuplexConnection) new TcpDuplexConnection(nettyInbound, nettyOutbound, nettyInbound.context())).subscribe();
            return nettyOutbound.neverComplete();
        }).map(NettyContextCloseable::new);
    }
}
