package io.scalecube.services.transport.rsocket;

import io.rsocket.RSocket;
import io.rsocket.core.RSocketConnector;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.scalecube.net.Address;
import io.scalecube.services.transport.api.ClientChannel;
import io.scalecube.services.transport.api.ClientTransport;
import io.scalecube.services.transport.api.ServiceMessageCodec;
import java.util.Map;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/scalecube/services/transport/rsocket/RSocketClientTransport.class */
public class RSocketClientTransport implements ClientTransport {
    private static final Logger LOGGER = LoggerFactory.getLogger(RSocketClientTransport.class);
    private final ThreadLocal<Map<Address, Mono<RSocket>>> rsockets = ThreadLocal.withInitial(ConcurrentHashMap::new);
    private final ServiceMessageCodec messageCodec;
    private final RSocketClientTransportFactory clientTransportFactory;

    public RSocketClientTransport(ServiceMessageCodec serviceMessageCodec, RSocketClientTransportFactory rSocketClientTransportFactory) {
        this.messageCodec = serviceMessageCodec;
        this.clientTransportFactory = rSocketClientTransportFactory;
    }

    public ClientChannel create(Address address) {
        Map<Address, Mono<RSocket>> map = this.rsockets.get();
        return new RSocketClientChannel(map.computeIfAbsent(address, address2 -> {
            return connect(address2, map);
        }), this.messageCodec);
    }

    private Mono<RSocket> connect(Address address, Map<Address, Mono<RSocket>> map) {
        return RSocketConnector.create().payloadDecoder(PayloadDecoder.DEFAULT).connect(() -> {
            return this.clientTransportFactory.clientTransport(address);
        }).doOnSuccess(rSocket -> {
            LOGGER.debug("[rsocket][client] Connected successfully on {}", address);
            rSocket.onClose().doFinally(signalType -> {
                map.remove(address);
                LOGGER.debug("[rsocket][client] Connection closed on {}", address);
            }).doOnError(th -> {
                LOGGER.warn("[rsocket][client][onClose] Exception occurred: {}", th.toString());
            }).subscribe();
        }).doOnError(th -> {
            LOGGER.warn("[rsocket][client] Failed to connect on {}, cause: {}", address, th.toString());
            map.remove(address);
        }).cache();
    }

    public String toString() {
        return new StringJoiner(", ", RSocketClientTransport.class.getSimpleName() + "[", "]").add("messageCodec=" + this.messageCodec).toString();
    }
}
