package io.scalecube.services.gateway.ws;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.exceptions.ExceptionProcessor;
import io.scalecube.services.gateway.ws.GatewayMessage;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import org.jctools.maps.NonBlockingHashMapLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.server.HttpServerRequest;
import reactor.netty.http.websocket.WebsocketInbound;
import reactor.netty.http.websocket.WebsocketOutbound;

/* loaded from: input_file:io/scalecube/services/gateway/ws/WebsocketSession.class */
public final class WebsocketSession {
    private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketSession.class);
    private static final String DEFAULT_CONTENT_TYPE = "application/json";
    private final WebsocketInbound inbound;
    private final WebsocketOutbound outbound;
    private final GatewayMessageCodec messageCodec;
    private final String contentType;
    private final Map<Long, Disposable> subscriptions = new NonBlockingHashMapLong(1024);
    private final String id = Integer.toHexString(System.identityHashCode(this));

    public WebsocketSession(GatewayMessageCodec gatewayMessageCodec, HttpServerRequest httpServerRequest, WebsocketInbound websocketInbound, WebsocketOutbound websocketOutbound) {
        this.messageCodec = gatewayMessageCodec;
        this.contentType = (String) Optional.ofNullable(httpServerRequest.requestHeaders().get(HttpHeaderNames.CONTENT_TYPE)).orElse(DEFAULT_CONTENT_TYPE);
        this.inbound = websocketInbound.withConnection(connection -> {
            connection.onDispose(this::clearSubscriptions);
        });
        this.outbound = websocketOutbound.options((v0) -> {
            v0.flushOnEach();
        });
    }

    public String id() {
        return this.id;
    }

    public String contentType() {
        return this.contentType;
    }

    public Flux<ByteBuf> receive() {
        return this.inbound.aggregateFrames().receive().retain();
    }

    public Mono<Void> send(GatewayMessage gatewayMessage) {
        Mono just = Mono.just(gatewayMessage);
        GatewayMessageCodec gatewayMessageCodec = this.messageCodec;
        gatewayMessageCodec.getClass();
        return send(just.map(gatewayMessageCodec::encode)).doOnSuccessOrError((r6, th) -> {
            logSend(gatewayMessage, th);
        });
    }

    public Mono<Void> send(Throwable th, Long l) {
        Mono map = Mono.just(th).map(ExceptionProcessor::toMessage).map(serviceMessage -> {
            return toErrorMessage(l, serviceMessage);
        });
        GatewayMessageCodec gatewayMessageCodec = this.messageCodec;
        gatewayMessageCodec.getClass();
        return send(map.map(gatewayMessageCodec::encode)).doOnSuccessOrError((r8, th2) -> {
            logSend(th, l, th2);
        });
    }

    private Mono<Void> send(Mono<ByteBuf> mono) {
        return mono.map(TextWebSocketFrame::new).flatMap(textWebSocketFrame -> {
            return this.outbound.sendObject(textWebSocketFrame).then();
        });
    }

    private void logSend(GatewayMessage gatewayMessage, Throwable th) {
        if (th == null) {
            LOGGER.debug("<< SEND success: {}, session={}", gatewayMessage, this.id);
        } else {
            LOGGER.warn("<< SEND failed: {}, session={}, cause: {}", new Object[]{gatewayMessage, this.id, th});
        }
    }

    private void logSend(Throwable th, Long l, Throwable th2) {
        if (th2 == null) {
            LOGGER.debug("<< SEND success: {}, sid={}, session={}", new Object[]{th, l, this.id});
        } else {
            LOGGER.warn("<< SEND failed: {}, sid={}, session={}, cause: {}", new Object[]{th, l, this.id, th2});
        }
    }

    private GatewayMessage toErrorMessage(Long l, ServiceMessage serviceMessage) {
        GatewayMessage.Builder from = GatewayMessage.from(serviceMessage);
        Optional ofNullable = Optional.ofNullable(l);
        from.getClass();
        ofNullable.ifPresent(from::streamId);
        return from.signal(Signal.ERROR).build();
    }

    public Mono<Void> close() {
        return this.outbound.sendClose().then();
    }

    public Mono<Void> onClose(Disposable disposable) {
        return Mono.create(monoSink -> {
            this.inbound.withConnection(connection -> {
                Mono onTerminate = connection.onDispose(disposable).onTerminate();
                monoSink.getClass();
                Consumer consumer = (v1) -> {
                    r1.success(v1);
                };
                monoSink.getClass();
                Consumer consumer2 = monoSink::error;
                monoSink.getClass();
                onTerminate.subscribe(consumer, consumer2, monoSink::success);
            });
        });
    }

    public boolean dispose(Long l) {
        boolean z = false;
        if (l != null) {
            Disposable remove = this.subscriptions.remove(l);
            z = remove != null;
            if (z) {
                LOGGER.debug("Dispose subscription by sid={}, session={}", l, this.id);
                remove.dispose();
            }
        }
        return z;
    }

    public boolean containsSid(Long l) {
        return l != null && this.subscriptions.containsKey(l);
    }

    public boolean register(Long l, Disposable disposable) {
        boolean z = false;
        if (!disposable.isDisposed()) {
            z = this.subscriptions.putIfAbsent(l, disposable) == null;
        }
        if (z) {
            LOGGER.debug("Registered subscription with sid={}, session={}", l, this.id);
        }
        return z;
    }

    private void clearSubscriptions() {
        if (this.subscriptions.size() > 1) {
            LOGGER.info("Clear all {} subscriptions on session={}", Integer.valueOf(this.subscriptions.size()), this.id);
        } else if (this.subscriptions.size() == 1) {
            LOGGER.info("Clear 1 subscription on session={}", this.id);
        }
        this.subscriptions.forEach((l, disposable) -> {
            disposable.dispose();
        });
        this.subscriptions.clear();
    }

    public String toString() {
        StringBuilder sb = new StringBuilder("WebsocketSession{");
        sb.append("id='").append(this.id).append('\'');
        sb.append(", contentType='").append(this.contentType).append('\'');
        sb.append('}');
        return sb.toString();
    }
}
