/*
 * Decompiled with CFR 0.152.
 */
package nakadi;

import java.util.List;
import java.util.Optional;
import nakadi.NakadiClient;
import nakadi.NakadiException;
import nakadi.Problem;
import nakadi.StreamBatch;
import nakadi.StreamBatchRecord;
import nakadi.StreamCursorContext;
import nakadi.StreamObserverBackPressure;
import nakadi.StreamOffsetObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public class LoggingStreamObserver
extends StreamObserverBackPressure<String> {
    private static final Logger logger = LoggerFactory.getLogger((String)NakadiClient.class.getSimpleName());
    static int eventCount = 0;

    @Override
    public void onStart() {
        logger.info("LoggingStreamObserver.onStart");
    }

    @Override
    public void onStop() {
        logger.info("LoggingStreamObserver.onStop");
    }

    @Override
    public void onCompleted() {
        logger.info("LoggingStreamObserver.onCompleted");
    }

    @Override
    public void onError(Throwable e) {
        logger.warn("LoggingStreamObserver.onError {}", (Object)e.getMessage());
        if (e instanceof InterruptedException) {
            Thread.currentThread().interrupt();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onNext(StreamBatchRecord<String> record) {
        try {
            StreamOffsetObserver offsetObserver = record.streamOffsetObserver();
            StreamBatch<String> batch = record.streamBatch();
            StreamCursorContext context = record.streamCursorContext();
            MDC.put((String)"cursor_context", (String)context.toString());
            try {
                if (batch.isEmpty()) {
                    logger.info("LoggingStreamObserver: keepalive");
                } else {
                    List<String> events = batch.events();
                    logger.info("LoggingStreamObserver events processing count {} =====================================", (Object)events.size());
                    for (String event : events) {
                        logger.info("LoggingStreamObserver received count {} event: {}", (Object)(++eventCount), (Object)event);
                    }
                    offsetObserver.onNext(record.streamCursorContext());
                    logger.info("LoggingStreamObserver events processed =====================================");
                }
            }
            finally {
                MDC.remove((String)"cursor_context");
            }
        }
        catch (NakadiException e) {
            throw e;
        }
        catch (Exception e) {
            throw new NakadiException(Problem.localProblem(e.getMessage(), ""), (Throwable)e);
        }
    }

    @Override
    public Optional<Long> requestBackPressure() {
        return Optional.empty();
    }

    @Override
    public Optional<Integer> requestBuffer() {
        return Optional.empty();
    }
}

