package io.scalecube.services.gateway.transport.rsocket;

import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.exceptions.ConnectionClosedException;
import io.scalecube.services.gateway.transport.GatewayClient;
import io.scalecube.services.gateway.transport.GatewayClientCodec;
import io.scalecube.services.gateway.transport.GatewayClientSettings;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.LoopResources;

/* loaded from: input_file:io/scalecube/services/gateway/transport/rsocket/RSocketGatewayClient.class */
public final class RSocketGatewayClient implements GatewayClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(RSocketGatewayClient.class);
    private static final AtomicReferenceFieldUpdater<RSocketGatewayClient, Mono> rSocketMonoUpdater = AtomicReferenceFieldUpdater.newUpdater(RSocketGatewayClient.class, Mono.class, "rsocketMono");
    private final GatewayClientSettings settings;
    private final GatewayClientCodec<Payload> codec;
    private volatile Mono<?> rsocketMono;
    private final MonoProcessor<Void> close = MonoProcessor.create();
    private final MonoProcessor<Void> onClose = MonoProcessor.create();
    private final LoopResources loopResources = LoopResources.create("rsocket-gateway-client");

    public RSocketGatewayClient(GatewayClientSettings gatewayClientSettings, GatewayClientCodec<Payload> gatewayClientCodec) {
        this.settings = gatewayClientSettings;
        this.codec = gatewayClientCodec;
        this.close.then(doClose()).doFinally(signalType -> {
            this.onClose.onComplete();
        }).doOnTerminate(() -> {
            LOGGER.info("Closed RSocketGatewayClient resources");
        }).subscribe((Consumer) null, th -> {
            LOGGER.warn("Exception occurred on RSocketGatewayClient close: " + th);
        });
    }

    @Override // io.scalecube.services.gateway.transport.GatewayClient
    public Mono<ServiceMessage> requestResponse(ServiceMessage serviceMessage) {
        return Mono.defer(() -> {
            return getOrConnect().flatMap(rSocket -> {
                return rSocket.requestResponse(toPayload(serviceMessage)).doOnSubscribe(subscription -> {
                    LOGGER.debug("Sending request {}", serviceMessage);
                }).onErrorMap(ClosedChannelException.class, closedChannelException -> {
                    return new ConnectionClosedException("Connection closed");
                });
            }).map(this::toMessage);
        });
    }

    @Override // io.scalecube.services.gateway.transport.GatewayClient
    public Flux<ServiceMessage> requestStream(ServiceMessage serviceMessage) {
        return Flux.defer(() -> {
            return getOrConnect().flatMapMany(rSocket -> {
                return rSocket.requestStream(toPayload(serviceMessage)).doOnSubscribe(subscription -> {
                    LOGGER.debug("Sending request {}", serviceMessage);
                }).onErrorMap(ClosedChannelException.class, closedChannelException -> {
                    return new ConnectionClosedException("Connection closed");
                });
            }).map(this::toMessage);
        });
    }

    @Override // io.scalecube.services.gateway.transport.GatewayClient
    public Flux<ServiceMessage> requestChannel(Flux<ServiceMessage> flux) {
        return Flux.defer(() -> {
            return getOrConnect().flatMapMany(rSocket -> {
                return rSocket.requestChannel(flux.doOnNext(serviceMessage -> {
                    LOGGER.debug("Sending request {}", serviceMessage);
                }).map(this::toPayload)).onErrorMap(ClosedChannelException.class, closedChannelException -> {
                    return new ConnectionClosedException("Connection closed");
                });
            }).map(this::toMessage);
        });
    }

    @Override // io.scalecube.services.gateway.transport.GatewayClient
    public void close() {
        this.close.onComplete();
    }

    @Override // io.scalecube.services.gateway.transport.GatewayClient
    public Mono<Void> onClose() {
        return this.onClose;
    }

    private Mono<Void> doClose() {
        LoopResources loopResources = this.loopResources;
        loopResources.getClass();
        return Mono.defer(loopResources::disposeLater);
    }

    public GatewayClientCodec<Payload> getCodec() {
        return this.codec;
    }

    private Mono<RSocket> getOrConnect() {
        return Mono.defer(() -> {
            return rSocketMonoUpdater.updateAndGet(this, this::getOrConnect0);
        });
    }

    private Mono<RSocket> getOrConnect0(Mono mono) {
        return mono != null ? mono : RSocketFactory.connect().metadataMimeType(this.settings.contentType()).transport(createRSocketTransport(this.settings)).start().doOnSuccess(rSocket -> {
            LOGGER.info("Connected successfully on {}:{}", this.settings.host(), Integer.valueOf(this.settings.port()));
            rSocket.onClose().doOnTerminate(() -> {
                rSocketMonoUpdater.getAndSet(this, null);
                LOGGER.info("Connection closed on {}:{}", this.settings.host(), Integer.valueOf(this.settings.port()));
            }).subscribe((Consumer) null, th -> {
                LOGGER.warn("Exception on closing rsocket: {}", th.toString());
            });
        }).doOnError(th -> {
            LOGGER.warn("Failed to connect on {}:{}, cause: {}", new Object[]{this.settings.host(), Integer.valueOf(this.settings.port()), th.toString()});
            rSocketMonoUpdater.getAndSet(this, null);
        }).cache();
    }

    private WebsocketClientTransport createRSocketTransport(GatewayClientSettings gatewayClientSettings) {
        return WebsocketClientTransport.create(HttpClient.newConnection().followRedirect(gatewayClientSettings.followRedirect()).tcpConfiguration(tcpClient -> {
            if (gatewayClientSettings.sslProvider() != null) {
                tcpClient = tcpClient.secure(gatewayClientSettings.sslProvider());
            }
            return tcpClient.runOn(this.loopResources).host(gatewayClientSettings.host()).port(gatewayClientSettings.port());
        }), "/");
    }

    private Payload toPayload(ServiceMessage serviceMessage) {
        return this.codec.encode(serviceMessage);
    }

    private ServiceMessage toMessage(Payload payload) {
        ServiceMessage decode = this.codec.decode(payload);
        LOGGER.debug("Received response {}", decode);
        return decode;
    }
}
