vertx / io.vertx.rx.java / RxHelper

RxHelper

open class RxHelper

A set of helpers for RxJava and Vert.x.

Author
Julien Viet

Constructors

<init>

RxHelper()

A set of helpers for RxJava and Vert.x.

Functions

blockingScheduler

open static fun blockingScheduler(vertx: Vertx): Scheduler
open static fun blockingScheduler(vertx: Vertx, ordered: Boolean): Scheduler

Create a scheduler for a Vertx object, actions can be blocking, they are not executed on Vertx event loop.

observableFuture

open static fun <T : Any> observableFuture(): ObservableFuture<T>

Create a new ObservableFuture<T> object: an rx.Observable implementation implementing Handler<AsyncResult<T>>. When the async result handler completes, the observable will produce the result and complete immediatly after, when it fails it will signal the error.

observableHandler

open static fun <T : Any> observableHandler(): ObservableHandler<T>

Create a new ObservableHandler<T> object: an rx.Observable implementation implementing Handler<T>. When the event handler completes, the observable will produce the event and complete immediatly after.

open static fun <T : Any> observableHandler(multi: Boolean): ObservableHandler<T>

Create a new ObservableHandler<T> object: an rx.Observable implementation implementing Handler<T>. When parameter is false and the event handler completes, the observable will produce the event and complete immediatly after, as a single event is expected.

scheduler

open static fun scheduler(vertx: Vertx): Scheduler

Create a scheduler for a Vertx object, actions are executed on the event loop.

open static fun scheduler(context: Context): Scheduler

Create a scheduler for a Context, actions are executed on the event loop of this context.

schedulerHook

open static fun schedulerHook(context: Context): RxJavaSchedulersHook

Create a scheduler hook for a Context object, the rx.plugins.RxJavaSchedulersHook#getIOScheduler() uses a blocking scheduler.

open static fun schedulerHook(vertx: Vertx): RxJavaSchedulersHook

Create a scheduler hook for a Vertx object, the rx.plugins.RxJavaSchedulersHook#getIOScheduler() uses a blocking scheduler.

toFuture

open static fun <T : Any> toFuture(observer: Observer<T>): Handler<AsyncResult<T>>

Adapt a Subscriber as a Handler<AsyncResult<T>>;.

open static fun <T : Any> toFuture(onNext: Action1<T>): Handler<AsyncResult<T>>

Adapt an item callback as a Handler<AsyncResult<T>>.

open static fun <T : Any> toFuture(onNext: Action1<T>, onError: Action1<Throwable>): Handler<AsyncResult<T>>
open static fun <T : Any> toFuture(onNext: Action1<T>, onError: Action1<Throwable>, onComplete: Action0): Handler<AsyncResult<T>>

Adapt an item callback and an error callback as a Handler<AsyncResult<T>>.

toHandler

open static fun <T : Any> toHandler(observer: Observer<T>): Handler<T>

Adapt a Subscriber as a Handler<T>;. When the event handler completes, the observer will complete immediatly after the event is received, as a single event is expected.

open static fun <T : Any> toHandler(observer: Observer<T>, multi: Boolean): Handler<T>

Adapt a Subscriber as a Handler<T>;. When parameter is false and the event handler completes, the observer will complete immediatly after the event is received, as a single event is expected.

open static fun <T : Any> toHandler(onNext: Action1<T>): Handler<T>

Adapt an item callback as a Handler<T>.

toObservable

open static fun <T : Any> toObservable(stream: ReadStream<T>): Observable<T>

Adapts a Vert.x < to an RxJava <. After the stream is adapted to an observable, the original stream handlers should not be used anymore as they will be used by the observable adapter.

The adapter supports reactive pull back-pressure.

When back-pressure is enabled, a buffer of ObservableReadStream#DEFAULT_MAX_BUFFER_SIZE items is maintained:

  • When the buffer is full, the stream is paused
  • When the buffer is half empty, the stream is resumed

open static fun <T : Any> toObservable(stream: ReadStream<T>, maxBufferSize: Int): Observable<T>

Adapts a Vert.x < to an RxJava <. After the stream is adapted to an observable, the original stream handlers should not be used anymore as they will be used by the observable adapter.

The adapter supports reactive pull back-pressure.

When back-pressure is enabled, a buffer of maxBufferSize items is maintained:

  • When the buffer is full, the stream is paused
  • When the buffer is half empty, the stream is resumed

open static fun <T : Any, R : Any> toObservable(stream: ReadStream<T>, adapter: Function<T, R>): Observable<R>

Like #toObservable(ReadStream) but with a function that adapts the items.

open static fun <T : Any, R : Any> toObservable(stream: ReadStream<T>, adapter: Function<T, R>, maxBufferSize: Int): Observable<R>

Like #toObservable(ReadStream, int) but with a function that adapts the items.

toReadStream

open static fun <T : Any> toReadStream(observable: Observable<T>): ReadStream<T>

Adapts an RxJava < to a Vert.x <. The returned readstream will be subscribed to the <.

toSubscriber

open static fun <T : Any> toSubscriber(handler: Handler<AsyncResult<T>>): Subscriber<T>

Adapts an Vert.x Handler<AsyncResult<T>> to an RxJava Subscriber.

The returned subscriber can be subscribed to an Observable#subscribe(Subscriber) or rx.Single#subscribe(Subscriber).

unmarshaller

open static fun <T : Any> unmarshaller(mappedType: Class<T>, mapper: ObjectMapper): Operator<T, Buffer>

Returns a unmarshaller for the specified java type as a rx.Observable.Operator instance given the the provided com.fasterxml.jackson.databind.ObjectMapper The marshaller can be used with the Observable#lift(rx.Observable.Operator) method to transform a Observable<Buffer> into a Observable<T>. The unmarshaller buffers the content until onComplete is called, then unmarshalling happens. Note that the returned observable will emit at most a single object.

open static fun <T : Any> unmarshaller(mappedType: Class<T>): Operator<T, Buffer>

Returns a json unmarshaller for the specified java type as a rx.Observable.Operator instance. The marshaller can be used with the Observable#lift(rx.Observable.Operator) method to transform a Observable<Buffer> into a Observable<T>. The unmarshaller buffers the content until onComplete is called, then unmarshalling happens. Note that the returned observable will emit at most a single object.

open static fun <T : Any> unmarshaller(mappedTypeRef: TypeReference<T>): Operator<T, Buffer>

Returns a json unmarshaller for the specified java type as a rx.Observable.Operator instance. The marshaller can be used with the rx.Observable#lift(rx.Observable.Operator) method to transform a Observable<Buffer> into a Observable<T>. The unmarshaller buffers the content until onComplete is called, then unmarshalling happens. Note that the returned observable will emit at most a single object.

open static fun <T : Any> unmarshaller(mappedTypeRef: TypeReference<T>, mapper: ObjectMapper): Operator<T, Buffer>

Returns a unmarshaller for the specified java type as a rx.Observable.Operator instance given the the provided com.fasterxml.jackson.databind.ObjectMapper The marshaller can be used with the rx.Observable#lift(rx.Observable.Operator) method to transform a Observable<Buffer> into a Observable<T>. The unmarshaller buffers the content until onComplete is called, then unmarshalling happens. Note that the returned observable will emit at most a single object.