/*
 * Decompiled with CFR 0.152.
 */
package nakadi;

import nakadi.ExceptionSupport;
import nakadi.MetricCollector;
import nakadi.NakadiClient;
import nakadi.NonRetryableNakadiException;
import nakadi.Problem;
import nakadi.RetryableException;
import nakadi.StreamBatchRecord;
import nakadi.StreamObserver;
import nakadi.shadow.io.reactivex.subscribers.ResourceSubscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class StreamBatchRecordSubscriber<T>
extends ResourceSubscriber<StreamBatchRecord<T>> {
    private static final Logger logger = LoggerFactory.getLogger((String)NakadiClient.class.getSimpleName());
    private final StreamObserver<T> observer;
    private final MetricCollector metricCollector;
    private boolean done;

    StreamBatchRecordSubscriber(StreamObserver<T> observer, MetricCollector metricCollector) {
        this.observer = observer;
        this.metricCollector = metricCollector;
    }

    @Override
    protected void onStart() {
        super.onStart();
        logger.debug("StreamBatchRecordSubscriber.onStart");
        this.observer.onBegin();
    }

    @Override
    public void onNext(StreamBatchRecord<T> record) {
        if (this.done) {
            return;
        }
        if (record == null) {
            NullPointerException npe = new NullPointerException("onNext called with null batch record. Null values are not expected from stream processors.");
            this.onError(npe);
            throw npe;
        }
        try {
            if (!record.streamBatch().isEmpty()) {
                this.metricCollector.mark(MetricCollector.Meter.receivedBatch, 1L);
                this.metricCollector.mark(MetricCollector.Meter.received, record.streamBatch().events().size());
            } else {
                this.metricCollector.mark(MetricCollector.Meter.receivedKeepalive, 1L);
            }
            this.observer.onNext(record);
            this.observer.requestBackPressure().ifPresent(this::request);
        }
        catch (RetryableException e) {
            logger.warn("StreamBatchRecordSubscriber.retryable_exception msg=" + e.getMessage(), (Throwable)e);
        }
        catch (NonRetryableNakadiException e) {
            logger.warn("StreamBatchRecordSubscriber.non_retryable_exception msg=" + e.getMessage());
            this.onError(e);
            throw e;
        }
        catch (Throwable t) {
            if (t instanceof Error) {
                logger.error("StreamBatchRecordSubscriber.detected_error msg={}", (Object)t.getMessage());
                this.onError(t);
                throw (Error)t;
            }
            if (!ExceptionSupport.isConsumerStreamRetryable(t)) {
                logger.error(String.format("StreamBatchRecordSubscriber.detected_nonretryable_exception type=%s msg=%s", t.getClass().getSimpleName(), t.getMessage()));
                this.onError(t);
                throw t;
            }
            logger.info(String.format("StreamBatchRecordSubscriber.detected_retryable_exception type=%s msg=%s", t.getClass().getSimpleName(), t.getMessage()));
        }
    }

    @Override
    public void onError(Throwable e) {
        logger.error("StreamBatchRecordSubscriber.onError " + e.getMessage());
        if (this.done) {
            logger.warn("observer_on_error_exception msg=onError_already_called");
            return;
        }
        this.done = true;
        try {
            this.observer.onError(e);
        }
        catch (Exception e1) {
            throw new NonRetryableNakadiException(Problem.localProblem("observer_on_error_exception", "observer.onError_threw_exception"), (Throwable)e1);
        }
    }

    @Override
    public void onComplete() {
        logger.info("StreamBatchRecordSubscriber.onCompleted");
        this.observer.onCompleted();
    }
}

