package io.scalecube.services.transport.rsocket;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufOutputStream;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.core.RSocketConnector;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.util.ByteBufPayload;
import io.scalecube.net.Address;
import io.scalecube.services.ServiceReference;
import io.scalecube.services.auth.ConnectionSetup;
import io.scalecube.services.auth.ConnectionSetupCodec;
import io.scalecube.services.auth.CredentialsSupplier;
import io.scalecube.services.exceptions.MessageCodecException;
import io.scalecube.services.exceptions.ServiceException;
import io.scalecube.services.exceptions.UnauthorizedException;
import io.scalecube.services.transport.api.ClientChannel;
import io.scalecube.services.transport.api.ClientTransport;
import io.scalecube.services.transport.api.DataCodec;
import io.scalecube.services.transport.api.HeadersCodec;
import io.scalecube.services.transport.api.ReferenceCountUtil;
import io.scalecube.services.transport.api.ServiceMessageCodec;
import io.scalecube.utils.MaskUtil;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/scalecube/services/transport/rsocket/RSocketClientTransport.class */
public class RSocketClientTransport implements ClientTransport {
    private static final Logger LOGGER = LoggerFactory.getLogger(RSocketClientTransport.class);
    private final ThreadLocal<Map<Address, Mono<RSocket>>> rsockets = ThreadLocal.withInitial(ConcurrentHashMap::new);
    private final CredentialsSupplier credentialsSupplier;
    private final ConnectionSetupCodec connectionSetupCodec;
    private final HeadersCodec headersCodec;
    private final Collection<DataCodec> dataCodecs;
    private final RSocketClientTransportFactory clientTransportFactory;

    public RSocketClientTransport(CredentialsSupplier credentialsSupplier, ConnectionSetupCodec connectionSetupCodec, HeadersCodec headersCodec, Collection<DataCodec> collection, RSocketClientTransportFactory rSocketClientTransportFactory) {
        this.credentialsSupplier = credentialsSupplier;
        this.connectionSetupCodec = connectionSetupCodec;
        this.headersCodec = headersCodec;
        this.dataCodecs = collection;
        this.clientTransportFactory = rSocketClientTransportFactory;
    }

    public ClientChannel create(ServiceReference serviceReference) {
        Map<Address, Mono<RSocket>> map = this.rsockets.get();
        return new RSocketClientChannel(map.computeIfAbsent(serviceReference.address(), address -> {
            return getCredentials(serviceReference).flatMap(map2 -> {
                return connect(address, map2, map);
            }).cache().doOnError(th -> {
            });
        }), new ServiceMessageCodec(this.headersCodec, this.dataCodecs));
    }

    private Mono<Map<String, String>> getCredentials(ServiceReference serviceReference) {
        return Mono.defer(() -> {
            return this.credentialsSupplier == null ? Mono.just(Collections.emptyMap()) : ((Mono) this.credentialsSupplier.apply(serviceReference)).switchIfEmpty(Mono.just(Collections.emptyMap())).doOnSuccess(map -> {
                LOGGER.debug("[credentialsSupplier] Got credentials ({}) for service: {}", mask(map), serviceReference);
            }).doOnError(th -> {
                LOGGER.error("[credentialsSupplier] Failed to get credentials for service: {}, cause: {}", serviceReference, th.toString());
            }).onErrorMap(this::toUnauthorizedException);
        });
    }

    private Mono<RSocket> connect(Address address, Map<String, String> map, Map<Address, Mono<RSocket>> map2) {
        return RSocketConnector.create().payloadDecoder(PayloadDecoder.DEFAULT).setupPayload(encodeConnectionSetup(new ConnectionSetup(map))).connect(() -> {
            return this.clientTransportFactory.clientTransport(address);
        }).doOnSuccess(rSocket -> {
            LOGGER.debug("[rsocket][client][{}] Connected successfully", address);
            rSocket.onClose().doFinally(signalType -> {
                map2.remove(address);
                LOGGER.debug("[rsocket][client][{}] Connection closed", address);
            }).doOnError(th -> {
                LOGGER.warn("[rsocket][client][{}][onClose] Exception occurred: {}", address, th.toString());
            }).subscribe();
        }).doOnError(th -> {
            LOGGER.warn("[rsocket][client][{}] Failed to connect, cause: {}", address, th.toString());
        });
    }

    private static Map<String, String> mask(Map<String, String> map) {
        return (Map) map.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return MaskUtil.mask((String) entry.getValue());
        }));
    }

    private Payload encodeConnectionSetup(ConnectionSetup connectionSetup) {
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
        try {
            this.connectionSetupCodec.encode(new ByteBufOutputStream(buffer), connectionSetup);
            return ByteBufPayload.create(buffer);
        } catch (Throwable th) {
            ReferenceCountUtil.safestRelease(buffer);
            LOGGER.error("Failed to encode connectionSetup: {}, cause: {}", connectionSetup, th.toString());
            throw new MessageCodecException("Failed to encode ConnectionSetup", th);
        }
    }

    private UnauthorizedException toUnauthorizedException(Throwable th) {
        if (!(th instanceof ServiceException)) {
            return new UnauthorizedException(th);
        }
        ServiceException serviceException = (ServiceException) th;
        return new UnauthorizedException(serviceException.errorCode(), serviceException.getMessage());
    }
}
