package io.scalecube.gateway.clientsdk.rsocket;

import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import io.rsocket.util.ByteBufPayload;
import io.scalecube.gateway.clientsdk.ClientCodec;
import io.scalecube.gateway.clientsdk.ClientMessage;
import io.scalecube.gateway.clientsdk.ClientSettings;
import io.scalecube.gateway.clientsdk.ClientTransport;
import io.scalecube.gateway.clientsdk.exceptions.ConnectionClosedException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.LoopResources;

/* loaded from: input_file:io/scalecube/gateway/clientsdk/rsocket/RSocketClientTransport.class */
public final class RSocketClientTransport implements ClientTransport {
    private static final Logger LOGGER = LoggerFactory.getLogger(RSocketClientTransport.class);
    private static final AtomicReferenceFieldUpdater<RSocketClientTransport, Mono> rSocketMonoUpdater = AtomicReferenceFieldUpdater.newUpdater(RSocketClientTransport.class, Mono.class, "rsocketMono");
    private final ClientSettings settings;
    private final ClientCodec<Payload> codec;
    private final LoopResources loopResources;
    private volatile Mono<?> rsocketMono;

    public RSocketClientTransport(ClientSettings clientSettings, ClientCodec<Payload> clientCodec, LoopResources loopResources) {
        this.settings = clientSettings;
        this.codec = clientCodec;
        this.loopResources = loopResources;
    }

    @Override // io.scalecube.gateway.clientsdk.ClientTransport
    public Mono<ClientMessage> requestResponse(ClientMessage clientMessage) {
        return Mono.defer(() -> {
            Payload payload = toPayload(clientMessage);
            return getOrConnect().flatMap(rSocket -> {
                return rSocket.requestResponse(payload).takeUntilOther(listenConnectionClose(rSocket));
            }).map(this::toMessage);
        });
    }

    @Override // io.scalecube.gateway.clientsdk.ClientTransport
    public Flux<ClientMessage> requestStream(ClientMessage clientMessage) {
        return Flux.defer(() -> {
            Payload payload = toPayload(clientMessage);
            return getOrConnect().flatMapMany(rSocket -> {
                return rSocket.requestStream(payload).takeUntilOther(listenConnectionClose(rSocket));
            }).map(this::toMessage);
        });
    }

    @Override // io.scalecube.gateway.clientsdk.ClientTransport
    public Mono<Void> close() {
        return Mono.defer(() -> {
            Mono mono = rSocketMonoUpdater.get(this);
            return (mono == null ? Mono.empty() : mono.flatMap(this::dispose)).doOnTerminate(() -> {
                LOGGER.info("Closed rsocket client sdk transport");
            });
        });
    }

    private Mono<? extends Void> dispose(RSocket rSocket) {
        rSocket.dispose();
        return rSocket.onClose();
    }

    private Mono<RSocket> getOrConnect() {
        return Mono.defer(() -> {
            return rSocketMonoUpdater.updateAndGet(this, this::getOrConnect0);
        });
    }

    private Mono<RSocket> getOrConnect0(Mono mono) {
        return mono != null ? mono : RSocketFactory.connect().metadataMimeType(this.settings.contentType()).frameDecoder(frame -> {
            return ByteBufPayload.create(frame.sliceData().retain(), frame.sliceMetadata().retain());
        }).transport(createRSocketTransport(this.settings)).start().doOnSuccess(rSocket -> {
            LOGGER.info("Connected successfully on {}:{}", this.settings.host(), Integer.valueOf(this.settings.port()));
            rSocket.onClose().doOnTerminate(() -> {
                rSocketMonoUpdater.getAndSet(this, null);
                LOGGER.info("Connection closed on {}:{}", this.settings.host(), Integer.valueOf(this.settings.port()));
            }).subscribe();
        }).doOnError(th -> {
            LOGGER.warn("Failed to connect on {}:{}, cause: {}", new Object[]{this.settings.host(), Integer.valueOf(this.settings.port()), th});
            rSocketMonoUpdater.getAndSet(this, null);
        }).cache();
    }

    private WebsocketClientTransport createRSocketTransport(ClientSettings clientSettings) {
        return WebsocketClientTransport.create(HttpClient.newConnection().tcpConfiguration(tcpClient -> {
            return tcpClient.runOn(this.loopResources).host(clientSettings.host()).port(clientSettings.port());
        }), "/");
    }

    private Payload toPayload(ClientMessage clientMessage) {
        return this.codec.encode(clientMessage);
    }

    private ClientMessage toMessage(Payload payload) {
        return this.codec.decode(payload);
    }

    private <T> Mono<T> listenConnectionClose(RSocket rSocket) {
        return rSocket.onClose().map(r2 -> {
            return r2;
        }).switchIfEmpty(Mono.defer(this::toConnectionClosedException));
    }

    private <T> Mono<T> toConnectionClosedException() {
        return Mono.error(new ConnectionClosedException("Connection closed"));
    }
}
