package io.scalecube.services.transport.rsocket;

import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.transport.api.ClientTransport;
import io.scalecube.services.transport.api.DataCodec;
import io.scalecube.services.transport.api.HeadersCodec;
import io.scalecube.services.transport.api.ReferenceCountUtil;
import io.scalecube.services.transport.api.ServerTransport;
import io.scalecube.services.transport.api.ServiceMessageCodec;
import io.scalecube.services.transport.api.ServiceTransport;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.netty.FutureMono;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.TcpClient;
import reactor.netty.tcp.TcpServer;

/* loaded from: input_file:io/scalecube/services/transport/rsocket/RSocketServiceTransport.class */
public class RSocketServiceTransport implements ServiceTransport {
    private int numOfWorkers;
    private HeadersCodec headersCodec;
    private Collection<DataCodec> dataCodecs;
    private Function<LoopResources, TcpServer> tcpServerProvider;
    private Function<LoopResources, TcpClient> tcpClientProvider;
    private EventLoopGroup eventLoopGroup;
    private LoopResources clientLoopResources;
    private LoopResources serverLoopResources;

    public RSocketServiceTransport() {
        this.numOfWorkers = Runtime.getRuntime().availableProcessors();
        this.tcpServerProvider = defaultTcpServerProvider();
        this.tcpClientProvider = defaultTcpClientProvider();
    }

    private RSocketServiceTransport(RSocketServiceTransport rSocketServiceTransport) {
        this.numOfWorkers = Runtime.getRuntime().availableProcessors();
        this.tcpServerProvider = defaultTcpServerProvider();
        this.tcpClientProvider = defaultTcpClientProvider();
        this.numOfWorkers = rSocketServiceTransport.numOfWorkers;
        this.headersCodec = rSocketServiceTransport.headersCodec;
        this.dataCodecs = rSocketServiceTransport.dataCodecs;
        this.eventLoopGroup = rSocketServiceTransport.eventLoopGroup;
        this.clientLoopResources = rSocketServiceTransport.clientLoopResources;
        this.serverLoopResources = rSocketServiceTransport.serverLoopResources;
        this.tcpServerProvider = rSocketServiceTransport.tcpServerProvider;
        this.tcpClientProvider = rSocketServiceTransport.tcpClientProvider;
    }

    public RSocketServiceTransport numOfWorkers(int i) {
        RSocketServiceTransport rSocketServiceTransport = new RSocketServiceTransport(this);
        rSocketServiceTransport.numOfWorkers = i;
        return rSocketServiceTransport;
    }

    public RSocketServiceTransport headersCodec(HeadersCodec headersCodec) {
        RSocketServiceTransport rSocketServiceTransport = new RSocketServiceTransport(this);
        rSocketServiceTransport.headersCodec = headersCodec;
        return rSocketServiceTransport;
    }

    public RSocketServiceTransport dataCodecs(Collection<DataCodec> collection) {
        RSocketServiceTransport rSocketServiceTransport = new RSocketServiceTransport(this);
        rSocketServiceTransport.dataCodecs = collection;
        return rSocketServiceTransport;
    }

    public RSocketServiceTransport tcpServer(Function<LoopResources, TcpServer> function) {
        RSocketServiceTransport rSocketServiceTransport = new RSocketServiceTransport(this);
        rSocketServiceTransport.tcpServerProvider = function;
        return rSocketServiceTransport;
    }

    public RSocketServiceTransport tcpClient(Function<LoopResources, TcpClient> function) {
        RSocketServiceTransport rSocketServiceTransport = new RSocketServiceTransport(this);
        rSocketServiceTransport.tcpClientProvider = function;
        return rSocketServiceTransport;
    }

    public ClientTransport clientTransport() {
        return new RSocketClientTransport(new ServiceMessageCodec(this.headersCodec, this.dataCodecs), this.tcpClientProvider.apply(this.clientLoopResources));
    }

    public ServerTransport serverTransport() {
        return new RSocketServerTransport(new ServiceMessageCodec(this.headersCodec, this.dataCodecs), this.tcpServerProvider.apply(this.serverLoopResources));
    }

    public Mono<RSocketServiceTransport> start() {
        return Mono.fromRunnable(this::start0).thenReturn(this);
    }

    public Mono<Void> stop() {
        return Flux.concatDelayError(new Publisher[]{Mono.defer(() -> {
            return this.serverLoopResources.disposeLater();
        }), Mono.defer(this::shutdownEventLoopGroup)}).then();
    }

    private void start0() {
        this.eventLoopGroup = newEventLoopGroup();
        this.clientLoopResources = DelegatedLoopResources.newClientLoopResources(this.eventLoopGroup);
        this.serverLoopResources = DelegatedLoopResources.newServerLoopResources(this.eventLoopGroup);
    }

    private EventLoopGroup newEventLoopGroup() {
        DefaultThreadFactory defaultThreadFactory = new DefaultThreadFactory("rsocket-worker", true);
        return LoopResources.colocate(Epoll.isAvailable() ? new EpollEventLoopGroup(this.numOfWorkers, defaultThreadFactory) : new NioEventLoopGroup(this.numOfWorkers, defaultThreadFactory));
    }

    private Mono<Void> shutdownEventLoopGroup() {
        return Mono.defer(() -> {
            return FutureMono.from(this.eventLoopGroup.shutdownGracefully());
        });
    }

    private Function<LoopResources, TcpServer> defaultTcpServerProvider() {
        return loopResources -> {
            return TcpServer.create().runOn(loopResources).bindAddress(() -> {
                return new InetSocketAddress(0);
            });
        };
    }

    private Function<LoopResources, TcpClient> defaultTcpClientProvider() {
        return loopResources -> {
            return TcpClient.newConnection().runOn(loopResources);
        };
    }

    public String toString() {
        return "RSocketServiceTransport{numOfWorkers=" + this.numOfWorkers + ", headersCodec=" + this.headersCodec + ", eventLoopGroup=" + this.eventLoopGroup + ", clientLoopResources=" + this.clientLoopResources + ", serverLoopResources=" + this.serverLoopResources + "}";
    }

    static {
        Hooks.onNextDropped(obj -> {
            ReferenceCountUtil.safestRelease(obj instanceof ServiceMessage ? ((ServiceMessage) obj).data() : obj);
        });
    }
}
