package io.scalecube.services.transport.rsocket.client;

import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.scalecube.services.codec.ServiceMessageCodec;
import io.scalecube.services.transport.client.api.ClientChannel;
import io.scalecube.services.transport.client.api.ClientTransport;
import io.scalecube.transport.Address;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.options.ClientOptions;
import reactor.ipc.netty.tcp.TcpClient;

/* loaded from: input_file:io/scalecube/services/transport/rsocket/client/RSocketClientTransport.class */
public class RSocketClientTransport implements ClientTransport {
    private static final Logger LOGGER = LoggerFactory.getLogger(RSocketClientTransport.class);
    private final Map<Address, Mono<RSocket>> rSockets = new ConcurrentHashMap();
    private final ServiceMessageCodec codec;

    public RSocketClientTransport(ServiceMessageCodec serviceMessageCodec) {
        this.codec = serviceMessageCodec;
    }

    @Override // io.scalecube.services.transport.client.api.ClientTransport
    public ClientChannel create(Address address) {
        Map<Address, Mono<RSocket>> map = this.rSockets;
        return new RSocketServiceClientAdapter(map.computeIfAbsent(address, address2 -> {
            return connect(address2, map);
        }), this.codec);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Mono<RSocket> connect(Address address, Map<Address, Mono<RSocket>> map) {
        return RSocketFactory.connect().transport(TcpClientTransport.create(TcpClient.create((Consumer<? super ClientOptions.Builder<?>>) builder -> {
            builder.disablePool().host(address.host()).port(address.port());
        }))).start().doOnSuccess(rSocket -> {
            LOGGER.info("Connected successfully on {}", address);
            rSocket.onClose().doOnTerminate(() -> {
                map.remove(address);
                LOGGER.info("Connection closed on {} and removed from the pool", address);
            }).subscribe();
        }).doOnError(th -> {
            LOGGER.warn("Connect failed on {}, cause: {}", address, th);
            map.remove(address);
        }).cache();
    }
}
