package io.scalecube.services.gateway.ws;

import io.netty.buffer.ByteBuf;
import io.scalecube.services.ServiceCall;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.exceptions.DefaultErrorMapper;
import io.scalecube.services.gateway.GatewayMetrics;
import io.scalecube.services.gateway.GatewaySessionHandler;
import io.scalecube.services.gateway.ReferenceCountUtil;
import io.scalecube.services.gateway.ws.GatewayMessage;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.server.HttpServerRequest;
import reactor.netty.http.server.HttpServerResponse;
import reactor.util.context.Context;

/* loaded from: input_file:io/scalecube/services/gateway/ws/WebsocketGatewayAcceptor.class */
public class WebsocketGatewayAcceptor implements BiFunction<HttpServerRequest, HttpServerResponse, Publisher<Void>> {
    private final GatewayMessageCodec messageCodec = new GatewayMessageCodec();
    private final ServiceCall serviceCall;
    private final GatewayMetrics metrics;
    private final GatewaySessionHandler<GatewayMessage> gatewayHandler;

    public WebsocketGatewayAcceptor(ServiceCall serviceCall, GatewayMetrics gatewayMetrics, GatewaySessionHandler<GatewayMessage> gatewaySessionHandler) {
        this.serviceCall = (ServiceCall) Objects.requireNonNull(serviceCall, "serviceCall");
        this.metrics = (GatewayMetrics) Objects.requireNonNull(gatewayMetrics, "metrics");
        this.gatewayHandler = (GatewaySessionHandler) Objects.requireNonNull(gatewaySessionHandler, "gatewayHandler");
    }

    @Override // java.util.function.BiFunction
    public Publisher<Void> apply(HttpServerRequest httpServerRequest, HttpServerResponse httpServerResponse) {
        return httpServerResponse.sendWebsocket((websocketInbound, websocketOutbound) -> {
            return onConnect(new WebsocketGatewaySession(this.messageCodec, httpServerRequest, websocketInbound, websocketOutbound, this.gatewayHandler));
        });
    }

    private Mono<Void> onConnect(WebsocketGatewaySession websocketGatewaySession) {
        this.gatewayHandler.onSessionOpen(websocketGatewaySession);
        websocketGatewaySession.receive().doOnError(th -> {
            this.gatewayHandler.onSessionError(websocketGatewaySession, th);
        }).subscribe(byteBuf -> {
            Mono.deferWithContext(context -> {
                return onRequest(websocketGatewaySession, byteBuf, context);
            }).subscriberContext(context2 -> {
                return this.gatewayHandler.onRequest(websocketGatewaySession, byteBuf, context2);
            }).subscribe();
        });
        return websocketGatewaySession.onClose(() -> {
            this.gatewayHandler.onSessionClose(websocketGatewaySession);
        });
    }

    private Mono<GatewayMessage> onRequest(WebsocketGatewaySession websocketGatewaySession, ByteBuf byteBuf, Context context) {
        return Mono.fromCallable(() -> {
            return this.messageCodec.decode(byteBuf);
        }).doOnNext(gatewayMessage -> {
            this.metrics.markRequest();
        }).map(this::validateSid).flatMap(gatewayMessage2 -> {
            return onCancel(websocketGatewaySession, gatewayMessage2);
        }).map(obj -> {
            return validateSid(websocketGatewaySession, (GatewayMessage) obj);
        }).map(this::validateQualifier).map(gatewayMessage3 -> {
            return this.gatewayHandler.mapMessage(websocketGatewaySession, gatewayMessage3, context);
        }).doOnNext(gatewayMessage4 -> {
            onMessage(websocketGatewaySession, gatewayMessage4, context);
        }).doOnError(th -> {
            if (!(th instanceof WebsocketContextException)) {
                this.gatewayHandler.onError(websocketGatewaySession, th, context);
                return;
            }
            WebsocketContextException websocketContextException = (WebsocketContextException) th;
            websocketContextException.releaseRequest();
            onError(websocketGatewaySession, websocketContextException.request(), websocketContextException.getCause(), context);
        });
    }

    private void onMessage(WebsocketGatewaySession websocketGatewaySession, GatewayMessage gatewayMessage, Context context) {
        Long streamId = gatewayMessage.streamId();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Flux requestMany = this.serviceCall.requestMany(GatewayMessage.toServiceMessage(gatewayMessage));
        Optional ofNullable = Optional.ofNullable(gatewayMessage.rateLimit());
        Objects.requireNonNull(requestMany);
        Flux doOnNext = ((Flux) ofNullable.map((v1) -> {
            return r1.limitRate(v1);
        }).orElse(requestMany)).map(serviceMessage -> {
            return prepareResponse(streamId, serviceMessage, atomicBoolean);
        }).doOnNext(gatewayMessage2 -> {
            this.metrics.markServiceResponse();
        });
        Objects.requireNonNull(websocketGatewaySession);
        websocketGatewaySession.register(streamId, doOnNext.flatMap(websocketGatewaySession::send).doOnError(th -> {
            onError(websocketGatewaySession, gatewayMessage, th, context);
        }).doOnComplete(() -> {
            onComplete(websocketGatewaySession, gatewayMessage, atomicBoolean, context);
        }).doFinally(signalType -> {
            websocketGatewaySession.dispose(streamId);
        }).subscriberContext(context).subscribe());
    }

    private void onError(WebsocketGatewaySession websocketGatewaySession, GatewayMessage gatewayMessage, Throwable th, Context context) {
        GatewayMessage.Builder from = GatewayMessage.from(DefaultErrorMapper.INSTANCE.toMessage(th));
        Optional ofNullable = Optional.ofNullable(gatewayMessage.streamId());
        Objects.requireNonNull(from);
        ofNullable.ifPresent(from::streamId);
        websocketGatewaySession.send(from.signal(Signal.ERROR).build()).subscriberContext(context).subscribe();
    }

    private void onComplete(WebsocketGatewaySession websocketGatewaySession, GatewayMessage gatewayMessage, AtomicBoolean atomicBoolean, Context context) {
        if (atomicBoolean.get()) {
            return;
        }
        GatewayMessage.Builder builder = GatewayMessage.builder();
        Optional ofNullable = Optional.ofNullable(gatewayMessage.streamId());
        Objects.requireNonNull(builder);
        ofNullable.ifPresent(builder::streamId);
        websocketGatewaySession.send(builder.signal(Signal.COMPLETE).build()).subscriberContext(context).subscribe();
    }

    private Mono<?> onCancel(WebsocketGatewaySession websocketGatewaySession, GatewayMessage gatewayMessage) {
        if (!gatewayMessage.hasSignal(Signal.CANCEL)) {
            return Mono.just(gatewayMessage);
        }
        Optional.ofNullable(gatewayMessage.data()).ifPresent(ReferenceCountUtil::safestRelease);
        websocketGatewaySession.dispose(gatewayMessage.streamId());
        return websocketGatewaySession.send(GatewayMessage.builder().streamId(gatewayMessage.streamId()).signal(Signal.CANCEL).build());
    }

    private GatewayMessage validateQualifier(GatewayMessage gatewayMessage) {
        if (gatewayMessage.qualifier() == null) {
            throw WebsocketContextException.badRequest("qualifier is missing", gatewayMessage);
        }
        return gatewayMessage;
    }

    private GatewayMessage validateSid(WebsocketGatewaySession websocketGatewaySession, GatewayMessage gatewayMessage) {
        if (websocketGatewaySession.containsSid(gatewayMessage.streamId())) {
            throw WebsocketContextException.badRequest("sid=" + gatewayMessage.streamId() + " is already registered", gatewayMessage);
        }
        return gatewayMessage;
    }

    private GatewayMessage validateSid(GatewayMessage gatewayMessage) {
        if (gatewayMessage.streamId() == null) {
            throw WebsocketContextException.badRequest("sid is missing", gatewayMessage);
        }
        return gatewayMessage;
    }

    private GatewayMessage prepareResponse(Long l, ServiceMessage serviceMessage, AtomicBoolean atomicBoolean) {
        GatewayMessage.Builder streamId = GatewayMessage.from(serviceMessage).streamId(l);
        if (serviceMessage.isError()) {
            atomicBoolean.set(true);
            streamId.signal(Signal.ERROR);
        }
        return streamId.build();
    }
}
