package io.scalecube.services.transport.rsocket;

import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.util.concurrent.EventExecutor;
import io.scalecube.services.transport.api.ClientTransport;
import io.scalecube.services.transport.api.HeadersCodec;
import io.scalecube.services.transport.api.ServerTransport;
import io.scalecube.services.transport.api.ServiceMessageCodec;
import io.scalecube.services.transport.api.ServiceTransport;
import java.util.Iterator;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.FutureMono;
import reactor.netty.resources.LoopResources;

/* loaded from: input_file:io/scalecube/services/transport/rsocket/RSocketServiceTransport.class */
public class RSocketServiceTransport implements ServiceTransport {
    private static final HeadersCodec HEADERS_CODEC = HeadersCodec.getInstance("application/json");
    private static final int NUM_OF_WORKERS = Runtime.getRuntime().availableProcessors();
    private final int numOfWorkers;
    private final ServiceMessageCodec messageCodec;
    private EventLoopGroup eventLoopGroup;
    private LoopResources clientLoopResources;
    private LoopResources serverLoopResources;

    public RSocketServiceTransport() {
        this(NUM_OF_WORKERS, HEADERS_CODEC);
    }

    public RSocketServiceTransport(int i, HeadersCodec headersCodec) {
        this.numOfWorkers = i;
        this.messageCodec = new ServiceMessageCodec(headersCodec);
    }

    public ClientTransport clientTransport() {
        return new RSocketClientTransport(this.messageCodec, this.clientLoopResources);
    }

    public ServerTransport serverTransport() {
        return new RSocketServerTransport(this.messageCodec, this.serverLoopResources);
    }

    public Mono<RSocketServiceTransport> start() {
        return Mono.fromRunnable(this::start0).thenReturn(this);
    }

    public Mono<Void> stop() {
        return Flux.concatDelayError(new Publisher[]{Mono.defer(() -> {
            return this.serverLoopResources.disposeLater();
        }), Mono.defer(this::shutdownEventLoopGroup)}).then();
    }

    private void start0() {
        this.eventLoopGroup = eventLoopGroup();
        this.clientLoopResources = DelegatedLoopResources.newClientLoopResources(this.eventLoopGroup);
        this.serverLoopResources = DelegatedLoopResources.newServerLoopResources(this.eventLoopGroup);
    }

    private EventLoopGroup eventLoopGroup() {
        return Epoll.isAvailable() ? new ExtendedEpollEventLoopGroup(this.numOfWorkers, this::chooseEventLoop) : new ExtendedNioEventLoopGroup(this.numOfWorkers, this::chooseEventLoop);
    }

    private EventLoop chooseEventLoop(Channel channel, Iterator<EventExecutor> it) {
        while (it.hasNext()) {
            EventLoop eventLoop = (EventExecutor) it.next();
            if (eventLoop.inEventLoop()) {
                return eventLoop;
            }
        }
        return null;
    }

    private Mono<Void> shutdownEventLoopGroup() {
        return Mono.defer(() -> {
            return FutureMono.from(this.eventLoopGroup.shutdownGracefully());
        });
    }
}
