package io.scalecube.services.gateway.transport.websocket;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.gateway.transport.GatewayClient;
import io.scalecube.services.gateway.transport.GatewayClientCodec;
import io.scalecube.services.gateway.transport.GatewayClientSettings;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.netty.Connection;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.LoopResources;

/* loaded from: input_file:io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClient.class */
public final class WebsocketGatewayClient implements GatewayClient {
    private static final String STREAM_ID = "sid";
    private static final String SIGNAL = "sig";
    private final GatewayClientCodec<ByteBuf> codec;
    private final GatewayClientSettings settings;
    private final HttpClient httpClient;
    private volatile Mono<?> websocketMono;
    private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketGatewayClient.class);
    private static final AtomicReferenceFieldUpdater<WebsocketGatewayClient, Mono> websocketMonoUpdater = AtomicReferenceFieldUpdater.newUpdater(WebsocketGatewayClient.class, Mono.class, "websocketMono");
    private final AtomicLong sidCounter = new AtomicLong();
    private final MonoProcessor<Void> close = MonoProcessor.create();
    private final MonoProcessor<Void> onClose = MonoProcessor.create();
    private final LoopResources loopResources = LoopResources.create("websocket-gateway-client");

    public WebsocketGatewayClient(GatewayClientSettings gatewayClientSettings, GatewayClientCodec<ByteBuf> gatewayClientCodec) {
        this.settings = gatewayClientSettings;
        this.codec = gatewayClientCodec;
        this.httpClient = HttpClient.newConnection().headers(httpHeaders -> {
            Map<String, String> headers = gatewayClientSettings.headers();
            httpHeaders.getClass();
            headers.forEach((v1, v2) -> {
                r1.add(v1, v2);
            });
        }).followRedirect(gatewayClientSettings.followRedirect()).tcpConfiguration(tcpClient -> {
            if (gatewayClientSettings.sslProvider() != null) {
                tcpClient = tcpClient.secure(gatewayClientSettings.sslProvider());
            }
            return tcpClient.wiretap(gatewayClientSettings.wiretap()).runOn(this.loopResources).host(gatewayClientSettings.host()).port(gatewayClientSettings.port());
        });
        this.close.then(doClose()).doFinally(signalType -> {
            this.onClose.onComplete();
        }).doOnTerminate(() -> {
            LOGGER.info("Closed WebsocketGatewayClient resources");
        }).subscribe((Consumer) null, th -> {
            LOGGER.warn("Exception occurred on WebsocketGatewayClient close: " + th);
        });
    }

    @Override // io.scalecube.services.gateway.transport.GatewayClient
    public Mono<ServiceMessage> requestResponse(ServiceMessage serviceMessage) {
        return Mono.defer(() -> {
            long incrementAndGet = this.sidCounter.incrementAndGet();
            return getOrConnect().flatMap(websocketGatewayClientSession -> {
                return websocketGatewayClientSession.send(encodeRequest(serviceMessage, incrementAndGet), incrementAndGet).doOnSubscribe(subscription -> {
                    LOGGER.debug("Sending request {}", serviceMessage);
                }).then(websocketGatewayClientSession.newMonoProcessor(incrementAndGet)).doOnCancel(() -> {
                    handleCancel(incrementAndGet, serviceMessage.qualifier(), websocketGatewayClientSession);
                }).doFinally(signalType -> {
                    websocketGatewayClientSession.removeProcessor(incrementAndGet);
                });
            });
        });
    }

    @Override // io.scalecube.services.gateway.transport.GatewayClient
    public Flux<ServiceMessage> requestStream(ServiceMessage serviceMessage) {
        return Flux.defer(() -> {
            long incrementAndGet = this.sidCounter.incrementAndGet();
            return getOrConnect().flatMapMany(websocketGatewayClientSession -> {
                return websocketGatewayClientSession.send(encodeRequest(serviceMessage, incrementAndGet), incrementAndGet).doOnSubscribe(subscription -> {
                    LOGGER.debug("Sending request {}", serviceMessage);
                }).thenMany(websocketGatewayClientSession.newUnicastProcessor(incrementAndGet)).doOnCancel(() -> {
                    handleCancel(incrementAndGet, serviceMessage.qualifier(), websocketGatewayClientSession);
                }).doFinally(signalType -> {
                    websocketGatewayClientSession.removeProcessor(incrementAndGet);
                });
            });
        });
    }

    @Override // io.scalecube.services.gateway.transport.GatewayClient
    public Flux<ServiceMessage> requestChannel(Flux<ServiceMessage> flux) {
        return Flux.error(new UnsupportedOperationException("requestChannel is not supported by WebSocket transport implementation"));
    }

    @Override // io.scalecube.services.gateway.transport.GatewayClient
    public void close() {
        this.close.onComplete();
    }

    @Override // io.scalecube.services.gateway.transport.GatewayClient
    public Mono<Void> onClose() {
        return this.onClose;
    }

    private Mono<Void> doClose() {
        LoopResources loopResources = this.loopResources;
        loopResources.getClass();
        return Mono.defer(loopResources::disposeLater);
    }

    public GatewayClientCodec<ByteBuf> getCodec() {
        return this.codec;
    }

    private Mono<WebsocketGatewayClientSession> getOrConnect() {
        return Mono.defer(() -> {
            return websocketMonoUpdater.updateAndGet(this, this::getOrConnect0);
        });
    }

    private Mono<WebsocketGatewayClientSession> getOrConnect0(Mono<WebsocketGatewayClientSession> mono) {
        if (mono != null) {
            return mono;
        }
        Duration keepAliveInterval = this.settings.keepAliveInterval();
        return this.httpClient.websocket().uri("/").connect().map(connection -> {
            return keepAliveInterval != Duration.ZERO ? connection.onReadIdle(keepAliveInterval.toMillis(), () -> {
                onReadIdle(connection);
            }).onWriteIdle(keepAliveInterval.toMillis(), () -> {
                onWriteIdle(connection);
            }) : connection;
        }).map(connection2 -> {
            WebsocketGatewayClientSession websocketGatewayClientSession = new WebsocketGatewayClientSession(this.codec, connection2);
            LOGGER.info("Created {} on {}:{}", new Object[]{websocketGatewayClientSession, this.settings.host(), Integer.valueOf(this.settings.port())});
            websocketGatewayClientSession.onClose().doOnTerminate(() -> {
                websocketMonoUpdater.getAndSet(this, null);
                LOGGER.info("Closed {} on {}:{}", new Object[]{websocketGatewayClientSession, this.settings.host(), Integer.valueOf(this.settings.port())});
            }).subscribe((Consumer) null, th -> {
                LOGGER.warn("Exception on closing session={}, cause: {}", websocketGatewayClientSession.id(), th.toString());
            });
            return websocketGatewayClientSession;
        }).doOnError(th -> {
            LOGGER.warn("Failed to connect on {}:{}, cause: {}", new Object[]{this.settings.host(), Integer.valueOf(this.settings.port()), th});
            websocketMonoUpdater.getAndSet(this, null);
        }).cache();
    }

    private void onWriteIdle(Connection connection) {
        LOGGER.debug("Sending keepalive on writeIdle");
        connection.outbound().sendObject(new PingWebSocketFrame()).then().subscribe((Consumer) null, th -> {
            LOGGER.warn("Can't send keepalive on writeIdle: " + th);
        });
    }

    private void onReadIdle(Connection connection) {
        LOGGER.debug("Sending keepalive on readIdle");
        connection.outbound().sendObject(new PingWebSocketFrame()).then().subscribe((Consumer) null, th -> {
            LOGGER.warn("Can't send keepalive on readIdle: " + th);
        });
    }

    private void handleCancel(long j, String str, WebsocketGatewayClientSession websocketGatewayClientSession) {
        websocketGatewayClientSession.send(this.codec.encode(ServiceMessage.builder().qualifier(str).header(STREAM_ID, Long.valueOf(j)).header(SIGNAL, Signal.CANCEL.codeAsString()).build()), j).subscribe((Consumer) null, th -> {
            LOGGER.error("Exception on sending CANCEL signal for session={}", websocketGatewayClientSession.id(), th);
        });
    }

    private ByteBuf encodeRequest(ServiceMessage serviceMessage, long j) {
        return this.codec.encode(ServiceMessage.from(serviceMessage).header(STREAM_ID, Long.valueOf(j)).build());
    }
}
