package io.scalecube.services.transport.rsocket;

import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.util.ByteBufPayload;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.codec.ServiceMessageCodec;
import io.scalecube.services.exceptions.ConnectionClosedException;
import io.scalecube.services.transport.api.ClientChannel;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/scalecube/services/transport/rsocket/RSocketServiceClientAdapter.class */
public class RSocketServiceClientAdapter implements ClientChannel {
    private Mono<RSocket> rsocket;
    private ServiceMessageCodec messageCodec;

    public RSocketServiceClientAdapter(Mono<RSocket> mono, ServiceMessageCodec serviceMessageCodec) {
        this.rsocket = mono;
        this.messageCodec = serviceMessageCodec;
    }

    @Override // io.scalecube.services.transport.api.ClientChannel
    public Mono<ServiceMessage> requestResponse(ServiceMessage serviceMessage) {
        return this.rsocket.flatMap(rSocket -> {
            return rSocket.requestResponse(toPayload(serviceMessage)).takeUntilOther(listenConnectionClose(rSocket));
        }).map(this::toMessage);
    }

    @Override // io.scalecube.services.transport.api.ClientChannel
    public Flux<ServiceMessage> requestStream(ServiceMessage serviceMessage) {
        return this.rsocket.flatMapMany(rSocket -> {
            return rSocket.requestStream(toPayload(serviceMessage)).takeUntilOther(listenConnectionClose(rSocket));
        }).map(this::toMessage);
    }

    @Override // io.scalecube.services.transport.api.ClientChannel
    public Flux<ServiceMessage> requestChannel(Publisher<ServiceMessage> publisher) {
        return this.rsocket.flatMapMany(rSocket -> {
            return rSocket.requestChannel(Flux.from(publisher).map(this::toPayload)).takeUntilOther(listenConnectionClose(rSocket));
        }).map(this::toMessage);
    }

    private Payload toPayload(ServiceMessage serviceMessage) {
        return (Payload) this.messageCodec.encodeAndTransform(serviceMessage, ByteBufPayload::create);
    }

    private ServiceMessage toMessage(Payload payload) {
        return this.messageCodec.decode(payload.sliceData(), payload.sliceMetadata());
    }

    private <T> Mono<T> listenConnectionClose(RSocket rSocket) {
        return rSocket.onClose().map(r2 -> {
            return r2;
        }).switchIfEmpty(Mono.defer(this::toConnectionClosedException));
    }

    private <T> Mono<T> toConnectionClosedException() {
        return Mono.error(new ConnectionClosedException("Connection closed"));
    }
}
