package reactmann;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.impl.ws.WebSocketFrameImpl;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.impl.LoggerFactory;
import io.vertx.rx.java.ObservableFuture;
import io.vertx.rx.java.RxHelper;
import reactmann.observables.EventObservable;
import reactmann.subscribers.EventToJsonAction;

/* loaded from: input_file:reactmann/WebSocketVerticle.class */
public class WebSocketVerticle extends AbstractVerticle {
    private static final Logger log = LoggerFactory.getLogger(WebSocketVerticle.class);

    public void start() throws Exception {
        Integer integer = Integer.getInteger("websocket.port", 5556);
        ObservableFuture observableFuture = RxHelper.observableFuture();
        HttpServer createHttpServer = this.vertx.createHttpServer(new HttpServerOptions().setPort(integer.intValue()));
        observableFuture.subscribe(httpServer -> {
            log.info("Starting web socket listener...");
        }, th -> {
            log.error("Could not start web socket listener at port " + integer, th);
        }, () -> {
            log.info("Started web socket listener on port " + integer);
        });
        EventObservable.convertFromWebSocketObservable(RxHelper.toObservable(createHttpServer.websocketStream())).subscribe(new EventToJsonAction(Riemann.getEvents(this.vertx), WebSocketFrameImpl::new), th2 -> {
            log.error(th2);
        });
        createHttpServer.listen(observableFuture.asHandler());
    }
}
