package io.scalecube.services.gateway.rsocket;

import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.util.ByteBufPayload;
import io.scalecube.services.ServiceCall;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.exceptions.DefaultErrorMapper;
import io.scalecube.services.gateway.GatewaySession;
import io.scalecube.services.gateway.ReferenceCountUtil;
import io.scalecube.services.gateway.ServiceMessageCodec;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/scalecube/services/gateway/rsocket/RSocketGatewaySession.class */
public final class RSocketGatewaySession extends AbstractRSocket implements GatewaySession {
    private static final AtomicLong SESSION_ID_GENERATOR = new AtomicLong(System.currentTimeMillis());
    private final ServiceCall serviceCall;
    private final ServiceMessageCodec messageCodec;
    private final long sessionId = SESSION_ID_GENERATOR.incrementAndGet();
    private final BiFunction<GatewaySession, ServiceMessage, ServiceMessage> messageMapper;
    private final Map<String, String> headers;

    public RSocketGatewaySession(ServiceCall serviceCall, ServiceMessageCodec serviceMessageCodec, Map<String, String> map, BiFunction<GatewaySession, ServiceMessage, ServiceMessage> biFunction) {
        this.serviceCall = serviceCall;
        this.messageCodec = serviceMessageCodec;
        this.messageMapper = biFunction;
        this.headers = Collections.unmodifiableMap(new HashMap(map));
    }

    @Override // io.scalecube.services.gateway.GatewaySession
    public long sessionId() {
        return this.sessionId;
    }

    @Override // io.scalecube.services.gateway.GatewaySession
    public Map<String, String> headers() {
        return this.headers;
    }

    public Mono<Void> fireAndForget(Payload payload) {
        return Mono.defer(() -> {
            return this.serviceCall.oneWay(toMessage(payload));
        });
    }

    public Mono<Payload> requestResponse(Payload payload) {
        return Mono.defer(() -> {
            ServiceMessage message = toMessage(payload);
            return this.serviceCall.requestOne(message).doOnError(th -> {
                releaseRequestOnError(message);
            }).onErrorResume(th2 -> {
                return Mono.just(DefaultErrorMapper.INSTANCE.toMessage(message.qualifier(), th2));
            }).map(this::toPayload);
        });
    }

    public Flux<Payload> requestStream(Payload payload) {
        return Flux.defer(() -> {
            ServiceMessage message = toMessage(payload);
            return this.serviceCall.requestMany(message).doOnError(th -> {
                releaseRequestOnError(message);
            }).onErrorResume(th2 -> {
                return Mono.just(DefaultErrorMapper.INSTANCE.toMessage(message.qualifier(), th2));
            }).map(this::toPayload);
        });
    }

    private ServiceMessage toMessage(Payload payload) {
        try {
            ServiceMessage apply = this.messageMapper.apply(this, this.messageCodec.decode(payload.sliceData().retain(), payload.sliceMetadata().retain()));
            payload.release();
            return apply;
        } catch (Throwable th) {
            payload.release();
            throw th;
        }
    }

    private Payload toPayload(ServiceMessage serviceMessage) {
        return (Payload) this.messageCodec.encodeAndTransform(serviceMessage, ByteBufPayload::create);
    }

    private void releaseRequestOnError(ServiceMessage serviceMessage) {
        ReferenceCountUtil.safestRelease(serviceMessage.data());
    }

    public String toString() {
        return "RSocketGatewaySession[" + this.sessionId + "]";
    }
}
