package io.scalecube.services.gateway.transport.websocket;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.scalecube.services.api.ErrorData;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.gateway.transport.GatewayClientCodec;
import io.scalecube.services.transport.api.ReferenceCountUtil;
import java.nio.channels.ClosedChannelException;
import java.util.Map;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.function.Consumer;
import org.jctools.maps.NonBlockingHashMapLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.netty.Connection;

/* loaded from: input_file:io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClientSession.class */
public final class WebsocketGatewayClientSession {
    private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketGatewayClientSession.class);
    private static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
    private static final String STREAM_ID = "sid";
    private static final String SIGNAL = "sig";
    private final GatewayClientCodec<ByteBuf> codec;
    private final Connection connection;
    private final Map<Long, Object> inboundProcessors = new NonBlockingHashMapLong(1024);
    private final String id = Integer.toHexString(System.identityHashCode(this));

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClientSession$RetryEmitFailureHandler.class */
    public static class RetryEmitFailureHandler implements Sinks.EmitFailureHandler {
        private static final RetryEmitFailureHandler INSTANCE = new RetryEmitFailureHandler();

        private RetryEmitFailureHandler() {
        }

        public boolean onEmitFailure(SignalType signalType, Sinks.EmitResult emitResult) {
            return emitResult == Sinks.EmitResult.FAIL_NON_SERIALIZED;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public WebsocketGatewayClientSession(GatewayClientCodec<ByteBuf> gatewayClientCodec, Connection connection) {
        this.codec = gatewayClientCodec;
        this.connection = connection;
        connection.inbound().receive().retain().subscribe(byteBuf -> {
            if (!byteBuf.isReadable()) {
                ReferenceCountUtil.safestRelease(byteBuf);
                return;
            }
            try {
                ServiceMessage decode = gatewayClientCodec.decode(byteBuf);
                if (!decode.headers().containsKey(STREAM_ID)) {
                    LOGGER.error("Ignore response: {} with null sid, session={}", decode, this.id);
                    Optional.ofNullable(decode.data()).ifPresent(ReferenceCountUtil::safestRelease);
                    return;
                }
                Object obj = this.inboundProcessors.get(Long.valueOf(Long.parseLong(decode.header(STREAM_ID))));
                if (obj == null) {
                    Optional.ofNullable(decode.data()).ifPresent(ReferenceCountUtil::safestRelease);
                } else {
                    handleResponse(decode, obj);
                }
            } catch (Exception e) {
                LOGGER.error("Response decoder failed: " + e);
            }
        });
        connection.onDispose(() -> {
            this.inboundProcessors.forEach((l, obj) -> {
                emitError(obj, CLOSED_CHANNEL_EXCEPTION);
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T> Sinks.One<T> newMonoProcessor(long j) {
        return (Sinks.One) this.inboundProcessors.computeIfAbsent(Long.valueOf(j), l -> {
            LOGGER.debug("Put sid={}, session={}", Long.valueOf(j), this.id);
            return Sinks.one();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T> Sinks.Many<T> newUnicastProcessor(long j) {
        return (Sinks.Many) this.inboundProcessors.computeIfAbsent(Long.valueOf(j), l -> {
            LOGGER.debug("Put sid={}, session={}", Long.valueOf(j), this.id);
            return Sinks.many().unicast().onBackpressureBuffer();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeProcessor(long j) {
        if (this.inboundProcessors.remove(Long.valueOf(j)) != null) {
            LOGGER.debug("Removed sid={}, session={}", Long.valueOf(j), this.id);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> send(ByteBuf byteBuf) {
        return Mono.defer(() -> {
            return this.connection.outbound().sendObject(Mono.just(byteBuf).map(TextWebSocketFrame::new), obj -> {
                return true;
            }).then();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void cancel(long j, String str) {
        send(this.codec.encode(ServiceMessage.builder().qualifier(str).header(STREAM_ID, Long.valueOf(j)).header(SIGNAL, Signal.CANCEL.codeAsString()).build())).subscribe((Consumer) null, th -> {
            LOGGER.error("Exception occurred on sending CANCEL signal for session={}", this.id, th);
        });
    }

    public Mono<Void> close() {
        return this.connection.outbound().sendClose().then();
    }

    public Mono<Void> onClose() {
        return this.connection.onDispose();
    }

    private void handleResponse(ServiceMessage serviceMessage, Object obj) {
        LOGGER.debug("Handle response: {}, session={}", serviceMessage, this.id);
        try {
            Optional map = Optional.ofNullable(serviceMessage.header(SIGNAL)).map(Signal::from);
            if (map.isPresent()) {
                Signal signal = (Signal) map.get();
                if (signal == Signal.COMPLETE) {
                    emitComplete(obj);
                }
                if (signal == Signal.ERROR) {
                    emitNext(obj, this.codec.decodeData(serviceMessage, ErrorData.class));
                }
            } else {
                emitNext(obj, serviceMessage);
            }
        } catch (Exception e) {
            emitError(obj, e);
        }
    }

    private static void emitNext(Object obj, ServiceMessage serviceMessage) {
        if (obj instanceof Sinks.One) {
            ((Sinks.One) obj).emitValue(serviceMessage, RetryEmitFailureHandler.INSTANCE);
        }
        if (obj instanceof Sinks.Many) {
            ((Sinks.Many) obj).emitNext(serviceMessage, RetryEmitFailureHandler.INSTANCE);
        }
    }

    private static void emitComplete(Object obj) {
        if (obj instanceof Sinks.One) {
            ((Sinks.One) obj).emitEmpty(RetryEmitFailureHandler.INSTANCE);
        }
        if (obj instanceof Sinks.Many) {
            ((Sinks.Many) obj).emitComplete(RetryEmitFailureHandler.INSTANCE);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void emitError(Object obj, Exception exc) {
        if (obj instanceof Sinks.One) {
            ((Sinks.One) obj).emitError(exc, RetryEmitFailureHandler.INSTANCE);
        }
        if (obj instanceof Sinks.Many) {
            ((Sinks.Many) obj).emitError(exc, RetryEmitFailureHandler.INSTANCE);
        }
    }

    public String toString() {
        return new StringJoiner(", ", WebsocketGatewayClientSession.class.getSimpleName() + "[", "]").add("id=" + this.id).toString();
    }
}
