package reactmann.subscribers;

import io.vertx.core.buffer.Buffer;
import rx.Observable;
import rx.Subscriber;

/* loaded from: input_file:reactmann/subscribers/BufferAction.class */
public class BufferAction implements Observable.OnSubscribe<Buffer> {
    private RiemannParser recordParser;
    private final Observable<Buffer> observable;

    public BufferAction(Observable<Buffer> observable) {
        this.observable = observable;
    }

    public void call(Subscriber<? super Buffer> subscriber) {
        this.observable.subscribe(buffer -> {
            if (this.recordParser == null) {
                this.recordParser = RiemannParser.newFixed(buffer -> {
                    subscriber.onNext(buffer);
                });
            }
            this.recordParser.handle(buffer);
        });
    }
}
