package io.scalecube.services.transport.rsocket;

import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.scalecube.services.auth.Authenticator;
import io.scalecube.services.auth.CredentialsSupplier;
import io.scalecube.services.exceptions.ConnectionClosedException;
import io.scalecube.services.methods.ServiceMethodRegistry;
import io.scalecube.services.transport.api.ClientTransport;
import io.scalecube.services.transport.api.DataCodec;
import io.scalecube.services.transport.api.HeadersCodec;
import io.scalecube.services.transport.api.ServerTransport;
import io.scalecube.services.transport.api.ServiceTransport;
import java.util.Collection;
import java.util.StringJoiner;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.netty.FutureMono;
import reactor.netty.channel.AbortedException;
import reactor.netty.resources.LoopResources;

/* loaded from: input_file:io/scalecube/services/transport/rsocket/RSocketServiceTransport.class */
public class RSocketServiceTransport implements ServiceTransport {
    public static final Logger LOGGER = LoggerFactory.getLogger(RSocketServiceTransport.class);
    private int numOfWorkers;
    private HeadersCodec headersCodec;
    private Collection<DataCodec> dataCodecs;
    private ConnectionSetupCodec connectionSetupCodec;
    private CredentialsSupplier credentialsSupplier;
    private Authenticator<Object> authenticator;
    private Function<LoopResources, RSocketServerTransportFactory> serverTransportFactory;
    private Function<LoopResources, RSocketClientTransportFactory> clientTransportFactory;
    private EventLoopGroup eventLoopGroup;
    private LoopResources clientLoopResources;
    private LoopResources serverLoopResources;

    public RSocketServiceTransport() {
        this.numOfWorkers = Runtime.getRuntime().availableProcessors();
        this.headersCodec = HeadersCodec.DEFAULT_INSTANCE;
        this.dataCodecs = DataCodec.getAllInstances();
        this.connectionSetupCodec = ConnectionSetupCodec.DEFAULT_INSTANCE;
        this.serverTransportFactory = RSocketServerTransportFactory.websocket();
        this.clientTransportFactory = RSocketClientTransportFactory.websocket();
    }

    private RSocketServiceTransport(RSocketServiceTransport rSocketServiceTransport) {
        this.numOfWorkers = Runtime.getRuntime().availableProcessors();
        this.headersCodec = HeadersCodec.DEFAULT_INSTANCE;
        this.dataCodecs = DataCodec.getAllInstances();
        this.connectionSetupCodec = ConnectionSetupCodec.DEFAULT_INSTANCE;
        this.serverTransportFactory = RSocketServerTransportFactory.websocket();
        this.clientTransportFactory = RSocketClientTransportFactory.websocket();
        this.numOfWorkers = rSocketServiceTransport.numOfWorkers;
        this.headersCodec = rSocketServiceTransport.headersCodec;
        this.dataCodecs = rSocketServiceTransport.dataCodecs;
        this.connectionSetupCodec = rSocketServiceTransport.connectionSetupCodec;
        this.credentialsSupplier = rSocketServiceTransport.credentialsSupplier;
        this.authenticator = rSocketServiceTransport.authenticator;
        this.eventLoopGroup = rSocketServiceTransport.eventLoopGroup;
        this.clientLoopResources = rSocketServiceTransport.clientLoopResources;
        this.serverLoopResources = rSocketServiceTransport.serverLoopResources;
        this.serverTransportFactory = rSocketServiceTransport.serverTransportFactory;
        this.clientTransportFactory = rSocketServiceTransport.clientTransportFactory;
    }

    public RSocketServiceTransport numOfWorkers(int i) {
        RSocketServiceTransport rSocketServiceTransport = new RSocketServiceTransport(this);
        rSocketServiceTransport.numOfWorkers = i;
        return rSocketServiceTransport;
    }

    public RSocketServiceTransport headersCodec(HeadersCodec headersCodec) {
        RSocketServiceTransport rSocketServiceTransport = new RSocketServiceTransport(this);
        rSocketServiceTransport.headersCodec = headersCodec;
        return rSocketServiceTransport;
    }

    public RSocketServiceTransport dataCodecs(Collection<DataCodec> collection) {
        RSocketServiceTransport rSocketServiceTransport = new RSocketServiceTransport(this);
        rSocketServiceTransport.dataCodecs = collection;
        return rSocketServiceTransport;
    }

    public RSocketServiceTransport connectionSetupCodec(ConnectionSetupCodec connectionSetupCodec) {
        RSocketServiceTransport rSocketServiceTransport = new RSocketServiceTransport(this);
        rSocketServiceTransport.connectionSetupCodec = connectionSetupCodec;
        return rSocketServiceTransport;
    }

    public RSocketServiceTransport credentialsSupplier(CredentialsSupplier credentialsSupplier) {
        RSocketServiceTransport rSocketServiceTransport = new RSocketServiceTransport(this);
        rSocketServiceTransport.credentialsSupplier = credentialsSupplier;
        return rSocketServiceTransport;
    }

    public <R> RSocketServiceTransport authenticator(Authenticator<? extends R> authenticator) {
        RSocketServiceTransport rSocketServiceTransport = new RSocketServiceTransport(this);
        rSocketServiceTransport.authenticator = authenticator;
        return rSocketServiceTransport;
    }

    public RSocketServiceTransport serverTransportFactory(Function<LoopResources, RSocketServerTransportFactory> function) {
        RSocketServiceTransport rSocketServiceTransport = new RSocketServiceTransport(this);
        rSocketServiceTransport.serverTransportFactory = function;
        return rSocketServiceTransport;
    }

    public RSocketServiceTransport clientTransportFactory(Function<LoopResources, RSocketClientTransportFactory> function) {
        RSocketServiceTransport rSocketServiceTransport = new RSocketServiceTransport(this);
        rSocketServiceTransport.clientTransportFactory = function;
        return rSocketServiceTransport;
    }

    public ClientTransport clientTransport() {
        return new RSocketClientTransport(this.credentialsSupplier, this.connectionSetupCodec, this.headersCodec, this.dataCodecs, this.clientTransportFactory.apply(this.clientLoopResources));
    }

    public ServerTransport serverTransport(ServiceMethodRegistry serviceMethodRegistry) {
        return new RSocketServerTransport(this.authenticator, serviceMethodRegistry, this.connectionSetupCodec, this.headersCodec, this.dataCodecs, this.serverTransportFactory.apply(this.serverLoopResources));
    }

    public ServiceTransport start() {
        this.eventLoopGroup = newEventLoopGroup();
        this.clientLoopResources = DelegatedLoopResources.newClientLoopResources(this.eventLoopGroup);
        this.serverLoopResources = DelegatedLoopResources.newServerLoopResources(this.eventLoopGroup);
        return this;
    }

    public void stop() {
        try {
            Flux.concatDelayError(new Publisher[]{Mono.defer(() -> {
                return this.serverLoopResources.disposeLater();
            }), Mono.defer(() -> {
                return FutureMono.from(this.eventLoopGroup.shutdownGracefully());
            })}).then().toFuture().get();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private EventLoopGroup newEventLoopGroup() {
        DefaultThreadFactory defaultThreadFactory = new DefaultThreadFactory("rsocket-worker", true);
        return LoopResources.colocate(Epoll.isAvailable() ? new EpollEventLoopGroup(this.numOfWorkers, defaultThreadFactory) : new NioEventLoopGroup(this.numOfWorkers, defaultThreadFactory));
    }

    public String toString() {
        return new StringJoiner(", ", RSocketServiceTransport.class.getSimpleName() + "[", "]").add("numOfWorkers=" + this.numOfWorkers).add("headersCodec=" + this.headersCodec).add("dataCodecs=" + this.dataCodecs).add("connectionSetupCodec=" + this.connectionSetupCodec).add("serverTransportFactory=" + this.serverTransportFactory).add("clientTransportFactory=" + this.clientTransportFactory).toString();
    }

    static {
        Hooks.onErrorDropped(th -> {
            if ((AbortedException.isConnectionReset(th) || ConnectionClosedException.isConnectionClosed(th)) && LOGGER.isDebugEnabled()) {
                LOGGER.debug("Connection aborted: {}", th.toString());
            }
        });
    }
}
