package reactmann;

import com.aphyr.riemann.Proto;
import com.google.protobuf.InvalidProtocolBufferException;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.streams.WriteStream;
import io.vertx.rx.java.RxHelper;
import reactmann.subscribers.BufferAction;
import rx.Observable;

/* loaded from: input_file:reactmann/Riemann.class */
public class Riemann {
    private Riemann() {
    }

    public static Observable<Event> getEvents(Vertx vertx) {
        return RxHelper.toObservable(vertx.eventBus().consumer("riemann.stream").bodyStream()).flatMap(obj -> {
            try {
                return Observable.from(Proto.Msg.parseFrom((byte[]) obj).getEventsList());
            } catch (InvalidProtocolBufferException e) {
                throw new RuntimeException((Throwable) e);
            }
        }).map(Event::fromProtoBufEvent);
    }

    public static <T extends WriteStream<Buffer>> Observable<Tup2<T, Proto.Msg>> convertBufferStreamToMessages(T t, Observable<Buffer> observable) {
        return Observable.create(new BufferAction(observable)).map(buffer -> {
            try {
                return Proto.Msg.parseFrom(buffer.getBytes(4, buffer.length()));
            } catch (Exception e) {
                throw new NetSocketException(t, e);
            }
        }).map(msg -> {
            return Tup2.create(t, msg);
        });
    }
}
