/*
 * Decompiled with CFR 0.152.
 */
package nakadi;

import java.util.List;
import nakadi.MetricCollector;
import nakadi.StreamBatchRecord;
import nakadi.StreamObserver;
import nakadi.shadow.io.reactivex.subscribers.ResourceSubscriber;

class StreamBatchRecordBufferingSubscriber<T>
extends ResourceSubscriber<List<StreamBatchRecord<T>>> {
    private final StreamObserver<T> observer;
    private MetricCollector metricCollector;

    StreamBatchRecordBufferingSubscriber(StreamObserver<T> observer, MetricCollector metricCollector) {
        this.observer = observer;
        this.metricCollector = metricCollector;
    }

    @Override
    protected void onStart() {
        super.onStart();
        this.observer.onStart();
    }

    @Override
    public void onNext(List<StreamBatchRecord<T>> records) {
        records.forEach(record -> {
            if (!record.streamBatch().isEmpty()) {
                this.metricCollector.mark(MetricCollector.Meter.receivedBatch, 1L);
                this.metricCollector.mark(MetricCollector.Meter.received, record.streamBatch().events().size());
            } else {
                this.metricCollector.mark(MetricCollector.Meter.receivedKeepalive, 1L);
            }
            this.observer.onNext((StreamBatchRecord<T>)record);
        });
        this.observer.requestBackPressure().ifPresent(this::request);
    }

    @Override
    public void onError(Throwable e) {
        this.observer.onError(e);
    }

    @Override
    public void onComplete() {
        this.observer.onCompleted();
    }
}

