package reactmann.subscribers;

import io.vertx.core.buffer.Buffer;
import io.vertx.core.parsetools.RecordParser;
import rx.Observable;
import rx.Subscriber;

/* loaded from: input_file:reactmann/subscribers/BufferAction.class */
public class BufferAction implements Observable.OnSubscribe<Buffer> {
    private int size = -1;
    private RecordParser recordParser;
    private final Observable<Buffer> observable;

    public BufferAction(Observable<Buffer> observable) {
        this.observable = observable;
    }

    public void call(Subscriber<? super Buffer> subscriber) {
        this.observable.subscribe(buffer -> {
            if (this.recordParser == null) {
                this.recordParser = RecordParser.newFixed(1, buffer -> {
                    subscriber.onNext(buffer);
                    this.size = -1;
                });
            }
            if (this.size == -1) {
                this.size = buffer.getInt(0);
                this.recordParser.fixedSizeMode(this.size + 4);
            }
            this.recordParser.handle(buffer);
        });
    }
}
