package io.scalecube.services.transport.rsocket;

import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.util.ByteBufPayload;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.transport.api.ClientChannel;
import io.scalecube.services.transport.api.ServiceMessageCodec;
import java.lang.reflect.Type;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/scalecube/services/transport/rsocket/RSocketClientChannel.class */
public class RSocketClientChannel implements ClientChannel {
    private final Mono<RSocket> rsocket;
    private final ServiceMessageCodec messageCodec;

    public RSocketClientChannel(Mono<RSocket> mono, ServiceMessageCodec serviceMessageCodec) {
        this.rsocket = mono;
        this.messageCodec = serviceMessageCodec;
    }

    public Mono<ServiceMessage> requestResponse(ServiceMessage serviceMessage, Type type) {
        return this.rsocket.flatMap(rSocket -> {
            return rSocket.requestResponse(toPayload(serviceMessage));
        }).map(this::toMessage).map(serviceMessage2 -> {
            return ServiceMessageCodec.decodeData(serviceMessage2, type);
        });
    }

    public Flux<ServiceMessage> requestStream(ServiceMessage serviceMessage, Type type) {
        return this.rsocket.flatMapMany(rSocket -> {
            return rSocket.requestStream(toPayload(serviceMessage));
        }).map(this::toMessage).map(serviceMessage2 -> {
            return ServiceMessageCodec.decodeData(serviceMessage2, type);
        });
    }

    public Flux<ServiceMessage> requestChannel(Publisher<ServiceMessage> publisher, Type type) {
        return this.rsocket.flatMapMany(rSocket -> {
            return rSocket.requestChannel(Flux.from(publisher).map(this::toPayload));
        }).map(this::toMessage).map(serviceMessage -> {
            return ServiceMessageCodec.decodeData(serviceMessage, type);
        });
    }

    private Payload toPayload(ServiceMessage serviceMessage) {
        return (Payload) this.messageCodec.encodeAndTransform(serviceMessage, ByteBufPayload::create);
    }

    private ServiceMessage toMessage(Payload payload) {
        try {
            return this.messageCodec.decode(payload.sliceData().retain(), payload.sliceMetadata().retain());
        } finally {
            payload.release();
        }
    }
}
