package io.scalecube.services.gateway.ws;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.scalecube.services.gateway.GatewaySession;
import io.scalecube.services.gateway.GatewaySessionHandler;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.jctools.maps.NonBlockingHashMapLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.server.HttpServerRequest;
import reactor.netty.http.websocket.WebsocketInbound;
import reactor.netty.http.websocket.WebsocketOutbound;

/* loaded from: input_file:io/scalecube/services/gateway/ws/WebsocketGatewaySession.class */
public final class WebsocketGatewaySession implements GatewaySession {
    private static final String DEFAULT_CONTENT_TYPE = "application/json";
    private final GatewaySessionHandler<GatewayMessage> gatewayHandler;
    private final WebsocketInbound inbound;
    private final WebsocketOutbound outbound;
    private final GatewayMessageCodec codec;
    private final String contentType;
    private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketGatewaySession.class);
    private static final AtomicLong SESSION_ID_GENERATOR = new AtomicLong(System.currentTimeMillis());
    private final Map<Long, Disposable> subscriptions = new NonBlockingHashMapLong(1024);
    private final long sessionId = SESSION_ID_GENERATOR.incrementAndGet();

    public WebsocketGatewaySession(GatewayMessageCodec gatewayMessageCodec, HttpServerRequest httpServerRequest, WebsocketInbound websocketInbound, WebsocketOutbound websocketOutbound, GatewaySessionHandler<GatewayMessage> gatewaySessionHandler) {
        this.codec = gatewayMessageCodec;
        this.contentType = (String) Optional.ofNullable(httpServerRequest.requestHeaders().get(HttpHeaderNames.CONTENT_TYPE)).orElse(DEFAULT_CONTENT_TYPE);
        this.inbound = websocketInbound.withConnection(connection -> {
            connection.onDispose(this::clearSubscriptions);
        });
        this.outbound = websocketOutbound;
        this.gatewayHandler = gatewaySessionHandler;
    }

    @Override // io.scalecube.services.gateway.GatewaySession
    public long sessionId() {
        return this.sessionId;
    }

    public String contentType() {
        return this.contentType;
    }

    public Flux<ByteBuf> receive() {
        return this.inbound.aggregateFrames().receiveFrames().filter(webSocketFrame -> {
            return ((webSocketFrame instanceof PongWebSocketFrame) || (webSocketFrame instanceof PingWebSocketFrame)) ? false : true;
        }).map(webSocketFrame2 -> {
            return webSocketFrame2.retain().content();
        });
    }

    public Mono<Void> send(GatewayMessage gatewayMessage) {
        return Mono.deferWithContext(context -> {
            WebsocketOutbound websocketOutbound = this.outbound;
            Mono just = Mono.just(gatewayMessage);
            GatewayMessageCodec gatewayMessageCodec = this.codec;
            Objects.requireNonNull(gatewayMessageCodec);
            return websocketOutbound.sendObject(just.map(gatewayMessageCodec::encode).map(TextWebSocketFrame::new).doOnNext(textWebSocketFrame -> {
                this.gatewayHandler.onResponse(this, textWebSocketFrame.content(), gatewayMessage, context);
            }), obj -> {
                return true;
            }).then().doOnError(th -> {
                this.gatewayHandler.onError(this, th, context);
            });
        });
    }

    public Mono<Void> close() {
        return this.outbound.sendClose().then();
    }

    public Mono<Void> close(String str) {
        return this.outbound.sendClose(1000, str).then();
    }

    public Mono<Void> onClose(Disposable disposable) {
        return Mono.create(monoSink -> {
            this.inbound.withConnection(connection -> {
                Mono onTerminate = connection.onDispose(disposable).onTerminate();
                Objects.requireNonNull(monoSink);
                Consumer consumer = (v1) -> {
                    r1.success(v1);
                };
                Objects.requireNonNull(monoSink);
                Consumer consumer2 = monoSink::error;
                Objects.requireNonNull(monoSink);
                onTerminate.subscribe(consumer, consumer2, monoSink::success);
            });
        });
    }

    public boolean dispose(Long l) {
        boolean z = false;
        if (l != null) {
            Disposable remove = this.subscriptions.remove(l);
            z = remove != null;
            if (z) {
                LOGGER.debug("Dispose subscription by sid={}, session={}", l, Long.valueOf(this.sessionId));
                remove.dispose();
            }
        }
        return z;
    }

    public boolean containsSid(Long l) {
        return l != null && this.subscriptions.containsKey(l);
    }

    public boolean register(Long l, Disposable disposable) {
        boolean z = false;
        if (!disposable.isDisposed()) {
            z = this.subscriptions.putIfAbsent(l, disposable) == null;
        }
        if (z) {
            LOGGER.debug("Registered subscription with sid={}, session={}", l, Long.valueOf(this.sessionId));
        }
        return z;
    }

    private void clearSubscriptions() {
        if (this.subscriptions.size() > 1) {
            LOGGER.debug("Clear all {} subscriptions on session={}", Integer.valueOf(this.subscriptions.size()), Long.valueOf(this.sessionId));
        } else if (this.subscriptions.size() == 1) {
            LOGGER.debug("Clear 1 subscription on session={}", Long.valueOf(this.sessionId));
        }
        this.subscriptions.forEach((l, disposable) -> {
            disposable.dispose();
        });
        this.subscriptions.clear();
    }

    public String toString() {
        return "WebsocketGatewaySession[" + this.sessionId + "]";
    }
}
