package io.scalecube.services.gateway.rsocket;

import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.util.ByteBufPayload;
import io.scalecube.services.ServiceCall;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.exceptions.DefaultErrorMapper;
import io.scalecube.services.gateway.GatewayMetrics;
import io.scalecube.services.gateway.GatewaySession;
import io.scalecube.services.gateway.ServiceMessageCodec;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/scalecube/services/gateway/rsocket/RSocketGatewaySession.class */
public final class RSocketGatewaySession extends AbstractRSocket implements GatewaySession {
    private static final AtomicLong SESSION_ID_GENERATOR = new AtomicLong(System.currentTimeMillis());
    private final ServiceCall serviceCall;
    private final GatewayMetrics metrics;
    private final ServiceMessageCodec messageCodec;
    private final long sessionId = SESSION_ID_GENERATOR.incrementAndGet();
    private final BiFunction<GatewaySession, ServiceMessage, ServiceMessage> messageMapper;

    public RSocketGatewaySession(ServiceCall serviceCall, GatewayMetrics gatewayMetrics, ServiceMessageCodec serviceMessageCodec, BiFunction<GatewaySession, ServiceMessage, ServiceMessage> biFunction) {
        this.serviceCall = serviceCall;
        this.metrics = gatewayMetrics;
        this.messageCodec = serviceMessageCodec;
        this.messageMapper = biFunction;
    }

    @Override // io.scalecube.services.gateway.GatewaySession
    public long sessionId() {
        return this.sessionId;
    }

    @Override // io.scalecube.services.gateway.GatewaySession
    public String headerValue(String str) {
        throw new UnsupportedOperationException();
    }

    @Override // io.scalecube.services.gateway.GatewaySession
    public List<String> headerValues(String str) {
        throw new UnsupportedOperationException();
    }

    public Mono<Void> fireAndForget(Payload payload) {
        return Mono.defer(() -> {
            this.metrics.markRequest();
            return this.serviceCall.oneWay(toMessage(payload));
        });
    }

    public Mono<Payload> requestResponse(Payload payload) {
        return Mono.defer(() -> {
            this.metrics.markRequest();
            return this.serviceCall.requestOne(toMessage(payload)).onErrorResume(th -> {
                return Mono.just(DefaultErrorMapper.INSTANCE.toMessage(th));
            }).map(this::toPayload).doOnNext(payload2 -> {
                this.metrics.markServiceResponse();
            });
        });
    }

    public Flux<Payload> requestStream(Payload payload) {
        return Flux.defer(() -> {
            this.metrics.markRequest();
            return this.serviceCall.requestMany(toMessage(payload)).onErrorResume(th -> {
                return Mono.just(DefaultErrorMapper.INSTANCE.toMessage(th));
            }).map(this::toPayload).doOnNext(payload2 -> {
                this.metrics.markServiceResponse();
            });
        });
    }

    private ServiceMessage toMessage(Payload payload) {
        try {
            ServiceMessage apply = this.messageMapper.apply(this, this.messageCodec.decode(payload.sliceData().retain(), payload.sliceMetadata().retain()));
            payload.release();
            return apply;
        } catch (Throwable th) {
            payload.release();
            throw th;
        }
    }

    private Payload toPayload(ServiceMessage serviceMessage) {
        return (Payload) this.messageCodec.encodeAndTransform(serviceMessage, ByteBufPayload::create);
    }

    public String toString() {
        return "RSocketGatewaySession[" + this.sessionId + "]";
    }
}
