package reactmann;

import com.aphyr.riemann.Proto;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.impl.LoggerFactory;
import io.vertx.core.net.NetServer;
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.streams.WriteStream;
import io.vertx.rx.java.ObservableFuture;
import io.vertx.rx.java.RxHelper;

/* loaded from: input_file:reactmann/TcpMessageVerticle.class */
public class TcpMessageVerticle extends AbstractVerticle {
    private static final Logger log = LoggerFactory.getLogger(TcpMessageVerticle.class);

    public void start() {
        Integer integer = Integer.getInteger("tcp.port", 5555);
        ObservableFuture observableFuture = RxHelper.observableFuture();
        NetServer createNetServer = this.vertx.createNetServer(new NetServerOptions().setPort(integer.intValue()));
        observableFuture.subscribe(netServer -> {
            log.info("Starting TCP listener..");
        }, th -> {
            log.error("Could not start TCP listener on port " + integer, th);
        }, () -> {
            log.info("Started TCP listener on port " + integer + ".");
        });
        RxHelper.toObservable(createNetServer.connectStream()).flatMap(netSocket -> {
            return Riemann.convertBufferStreamToMessages(netSocket, RxHelper.toObservable(netSocket));
        }).subscribe(tup2 -> {
            sendResponse(Proto.Msg.newBuilder().setOk(true).build(), (WriteStream) tup2.getLeft());
            this.vertx.eventBus().publish("riemann.stream", ((Proto.Msg) tup2.getRight()).toByteArray());
        }, th2 -> {
            log.error(th2);
            if (th2 instanceof NetSocketException) {
                sendResponse(Proto.Msg.newBuilder().setError(th2.getMessage()).build(), ((NetSocketException) th2).getSocket());
            }
        });
        createNetServer.listen(observableFuture.asHandler());
    }

    private void sendResponse(Proto.Msg msg, WriteStream<Buffer> writeStream) {
        byte[] byteArray = msg.toByteArray();
        Buffer buffer = Buffer.buffer();
        buffer.appendInt(byteArray.length);
        buffer.appendBytes(byteArray);
        writeStream.write(buffer);
    }
}
