package io.scalecube.gateway.rsocket.websocket;

import io.rsocket.AbstractRSocket;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import io.rsocket.util.ByteBufPayload;
import io.scalecube.services.ServiceCall;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.codec.HeadersCodec;
import io.scalecube.services.codec.ServiceMessageCodec;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/scalecube/gateway/rsocket/websocket/RSocketWebsocketAcceptor.class */
public class RSocketWebsocketAcceptor implements SocketAcceptor {
    private static final Logger LOGGER = LoggerFactory.getLogger(RSocketWebsocketAcceptor.class);
    private ServiceCall serviceCall;

    public RSocketWebsocketAcceptor(ServiceCall serviceCall) {
        this.serviceCall = serviceCall;
    }

    @Override // io.rsocket.SocketAcceptor
    public Mono<RSocket> accept(ConnectionSetupPayload connectionSetupPayload, RSocket rSocket) {
        LOGGER.info("Accepted rSocket websocket: {}, connectionSetup: {}", rSocket, connectionSetupPayload);
        final ServiceMessageCodec serviceMessageCodec = new ServiceMessageCodec(HeadersCodec.getInstance(connectionSetupPayload.metadataMimeType()));
        return Mono.just(new AbstractRSocket() { // from class: io.scalecube.gateway.rsocket.websocket.RSocketWebsocketAcceptor.1
            @Override // io.rsocket.AbstractRSocket, io.rsocket.RSocket
            public Mono<Void> fireAndForget(Payload payload) {
                return RSocketWebsocketAcceptor.this.serviceCall.oneWay(toServiceMessage(payload));
            }

            @Override // io.rsocket.AbstractRSocket, io.rsocket.RSocket
            public Mono<Payload> requestResponse(Payload payload) {
                return RSocketWebsocketAcceptor.this.serviceCall.requestOne(toServiceMessage(payload)).map(this::toPayload);
            }

            @Override // io.rsocket.AbstractRSocket, io.rsocket.RSocket
            public Flux<Payload> requestStream(Payload payload) {
                return RSocketWebsocketAcceptor.this.serviceCall.requestMany(toServiceMessage(payload)).map(this::toPayload);
            }

            @Override // io.rsocket.AbstractRSocket, io.rsocket.RSocket
            public Flux<Payload> requestChannel(Publisher<Payload> publisher) {
                return RSocketWebsocketAcceptor.this.serviceCall.requestBidirectional(Flux.from(publisher).map(this::toServiceMessage)).map(this::toPayload);
            }

            private ServiceMessage toServiceMessage(Payload payload) {
                return serviceMessageCodec.decode(payload.sliceData(), payload.sliceMetadata());
            }

            private Payload toPayload(ServiceMessage serviceMessage) {
                return (Payload) serviceMessageCodec.encodeAndTransform(serviceMessage, ByteBufPayload::create);
            }
        });
    }
}
