public abstract class RxRatpack extends Object
IMPORTANT: the initialize() method must be called to fully enable integration.
| Constructor and Description |
|---|
RxRatpack() |
| Modifier and Type | Method and Description |
|---|---|
static void |
initialize()
Registers an
RxJavaObservableExecutionHook with RxJava that provides a default error handling strategy of forwarding exceptions to the exec context. |
static <T> Observable<T> |
observe(Promise<T> promise)
Converts a Ratpack promise into an Rx observable.
|
static <T,I extends Iterable<T>> |
observeEach(Promise<I> promise)
Converts a Ratpack promise of an iterable value into an Rx observable for each element of the promised iterable.
|
public static void initialize()
RxJavaObservableExecutionHook with RxJava that provides a default error handling strategy of forwarding exceptions to the exec context.
This method is idempotent. It only needs to be called once per JVM, regardless of how many Ratpack applications are running within the JVM.
For a Java application, a convenient place to call this is in the handler factory implementation.
import ratpack.launch.HandlerFactory;
import ratpack.launch.LaunchConfig;
import ratpack.handling.Handler;
import ratpack.handling.Handlers;
import ratpack.handling.Context;
import ratpack.handling.ChainAction;
import ratpack.error.ServerErrorHandler;
import ratpack.registry.RegistrySpecAction;
import ratpack.rx.RxRatpack;
import rx.Observable;
import rx.functions.Action1;
public class MyHandlerFactory implements HandlerFactory {
public Handler create(LaunchConfig launchConfig) {
// Enable Rx integration
RxRatpack.initialize();
return Handlers.chain(launchConfig, new ChainAction() {
public void execute() {
register(new RegistrySpecAction() { // register a custom error handler
public void execute() {
add(ServerErrorHandler, new ServerErrorHandler() {
public void error(Context context, Exception exception) {
context.render("caught by error handler!");
}
})
}
});
get(new Handler() {
public void handle(Context context) {
// An observable sequence with no defined error handler
// The error will be propagated to context error handler implicitly
Observable.<String>error(new Exception("!")).subscribe(new Action1<String>() {
public void call(String str) {
// will never be called
}
});
}
});
}
});
}
}
// Test (Groovy) …
import ratpack.test.embed.LaunchConfigEmbeddedApplication
import ratpack.launch.LaunchConfigBuilder
import static ratpack.groovy.test.TestHttpClients.testHttpClient
def app = new LaunchConfigEmbeddedApplication() {
protected LaunchConfig createLaunchConfig() {
LaunchConfigBuilder.noBaseDir().build(new MyHandlerFactory());
}
}
def client = testHttpClient(app)
try {
client.getText() == "caught by error handler!"
} finally {
app.close()
}
For a Groovy DSL application, it can be registered during the module bindings.
import ratpack.handling.Context
import ratpack.error.ServerErrorHandler
import ratpack.rx.RxRatpack
import rx.Observable
import static ratpack.groovy.test.embed.EmbeddedApplications.embeddedApp
import static ratpack.groovy.test.TestHttpClients.testHttpClient
def app = embeddedApp {
modules {
// Enable Rx integration
RxRatpack.initialize()
bind ServerErrorHandler, new ServerErrorHandler() {
void error(Context context, Exception exception) {
context.render("caught by error handler!")
}
}
}
handlers {
get {
// An observable sequence with no defined error handler
// The error will be propagated to context error handler implicitly
Observable.error(new Exception("!")).subscribe {
// will never happen
}
}
}
}
def client = testHttpClient(app)
try {
client.getText() == "caught by error handler!"
} finally {
app.close()
}
public static <T> Observable<T> observe(Promise<T> promise)
For example, this can be used to observe blocking operations.
In Java…
import ratpack.handling.Handler;
import ratpack.handling.Context;
import ratpack.exec.Promise;
import java.util.concurrent.Callable;
import rx.functions.Func1;
import rx.functions.Action1;
import static ratpack.rx.RxRatpack.observe;
public class ReactiveHandler implements Handler {
public void handle(Context context) {
Promise<String> promise = context.blocking(new Callable<String>() {
public String call() {
// do some blocking IO here
return "hello world";
}
});
observe(promise).map(new Func1<String, String>() {
public String call(String input) {
return input.toUpperCase();
}
}).subscribe(new Action1<String>() {
public void call(String str) {
context.render(str); // renders: HELLO WORLD
}
});
}
}
A similar example in the Groovy DSL would look like:
import static ratpack.rx.RxRatpack.observe;
handler {
observe(blocking {
// do some blocking IO
"hello world"
}) map { String input ->
input.toUpperCase()
} subscribe {
render it // renders: HELLO WORLD
}
}
T - the type of value promisedpromise - the promisepublic static <T,I extends Iterable<T>> Observable<T> observeEach(Promise<I> promise)
The promised iterable will be emitted to the observer one element at a time.
For example, this can be used to observe background operations that produce some kind of iterable…
import static ratpack.rx.RxRatpack.observeEach
handler {
observeEach(blocking {
// do some blocking IO and return a List<String>
// each item in the List is emitted to the next Observable, not the List
["a", "b", "c"]
}) map { String input ->
input.toUpperCase()
} subscribe {
println it
}
}
The output would be:
T - the element type of the promised iterableI - the type of iterablepromise - the promiseobserve(ratpack.exec.Promise)