package net.soundvibe.reacto.client.events;

import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.WebSocket;
import io.vertx.core.http.WebSocketStream;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import java.net.URI;
import java.util.Objects;
import net.soundvibe.reacto.client.errors.ConnectionClosedUnexpectedly;
import net.soundvibe.reacto.internal.InternalEvent;
import net.soundvibe.reacto.mappers.Mappers;
import net.soundvibe.reacto.server.handlers.WebSocketFrameHandler;
import net.soundvibe.reacto.types.Command;
import net.soundvibe.reacto.types.Event;
import net.soundvibe.reacto.types.ReactiveException;
import net.soundvibe.reacto.utils.Factories;
import rx.Observable;
import rx.Subscriber;

/* loaded from: input_file:net/soundvibe/reacto/client/events/VertxWebSocketEventHandler.class */
public class VertxWebSocketEventHandler implements EventHandler {
    private static final Logger log = LoggerFactory.getLogger(VertxWebSocketEventHandler.class);
    private final URI wsUrl;

    public VertxWebSocketEventHandler(URI uri) {
        Objects.requireNonNull(uri, "WebSocket URI cannot be null");
        this.wsUrl = uri;
    }

    @Override // net.soundvibe.reacto.client.events.EventHandler
    public Observable<Event> toObservable(Command command) {
        try {
            return Observable.using(() -> {
                return Factories.vertx().createHttpClient(new HttpClientOptions());
            }, httpClient -> {
                return observe(httpClient.websocketStream(getPortFromURI(this.wsUrl), this.wsUrl.getHost(), this.wsUrl.getPath()), command);
            }, (v0) -> {
                v0.close();
            });
        } catch (Throwable th) {
            return Observable.error(th);
        }
    }

    public static Observable<Event> observe(WebSocketStream webSocketStream, Command command) {
        return Observable.create(subscriber -> {
            try {
                subscriber.getClass();
                webSocketStream.exceptionHandler(subscriber::onError).handler(webSocket -> {
                    try {
                        WebSocket closeHandler = webSocket.setWriteQueueMaxSize(Integer.MAX_VALUE).closeHandler(r8 -> {
                            if (subscriber.isUnsubscribed()) {
                                return;
                            }
                            subscriber.onError(new ConnectionClosedUnexpectedly("WebSocket connection closed without completion for command: " + command));
                        });
                        subscriber.getClass();
                        closeHandler.exceptionHandler(subscriber::onError);
                        sendCommandToExecutor(command, webSocket);
                        checkForEvents(webSocket, subscriber);
                    } catch (Throwable th) {
                        subscriber.onError(th);
                    }
                });
            } catch (Throwable th) {
                subscriber.onError(th);
            }
        });
    }

    private static void checkForEvents(WebSocket webSocket, Subscriber<? super Event> subscriber) {
        webSocket.frameHandler(new WebSocketFrameHandler(buffer -> {
            try {
                if (!subscriber.isUnsubscribed()) {
                    handleEvent(Mappers.fromBytesToInternalEvent(buffer.getBytes()), subscriber);
                }
            } catch (Throwable th) {
                subscriber.onError(th);
            }
        }));
    }

    private static void handleEvent(InternalEvent internalEvent, Subscriber<? super Event> subscriber) {
        log.debug("InternalEvent has been received and is being handled: " + internalEvent);
        switch (internalEvent.eventType) {
            case NEXT:
                subscriber.onNext(Mappers.fromInternalEvent(internalEvent));
                return;
            case ERROR:
                if (subscriber.isUnsubscribed()) {
                    return;
                }
                subscriber.onError(internalEvent.error.orElse(ReactiveException.from(new UnknownError("Unknown error from internalEvent: " + internalEvent))));
                return;
            case COMPLETED:
                if (subscriber.isUnsubscribed()) {
                    return;
                }
                subscriber.onCompleted();
                return;
            default:
                return;
        }
    }

    private int getPortFromURI(URI uri) {
        if (uri.getPort() == -1) {
            return 80;
        }
        return uri.getPort();
    }

    private static void sendCommandToExecutor(Command command, WebSocket webSocket) {
        log.info("Sending command to executor: " + command);
        webSocket.writeBinaryMessage(Buffer.buffer(Mappers.commandToBytes(command)));
    }
}
