/*
 * Decompiled with CFR 0.152.
 */
package nakadi;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import nakadi.NakadiClient;
import nakadi.NakadiException;
import nakadi.StreamCursorContext;
import nakadi.StreamOffsetObserver;
import nakadi.Unstable;
import nakadi.shadow.com.google.common.util.concurrent.ThreadFactoryBuilder;
import nakadi.shadow.io.reactivex.Flowable;
import nakadi.shadow.io.reactivex.processors.PublishProcessor;
import nakadi.shadow.io.reactivex.schedulers.Schedulers;
import nakadi.shadow.org.reactivestreams.Subscriber;
import nakadi.shadow.org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Unstable
public class SubscriptionOffsetPublisher
implements StreamOffsetObserver {
    private static final Logger logger = LoggerFactory.getLogger((String)NakadiClient.class.getSimpleName());
    private static final ExecutorService EXECUTOR = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setUncaughtExceptionHandler((t, e) -> logger.error("stream_observer_err {}, {}", new Object[]{t, e.getMessage(), e})).setNameFormat("nakadi-java-stream-observer-%d").build());
    private final PublishProcessor<StreamCursorContext> processor = PublishProcessor.create();
    private final Flowable<StreamCursorContext> flowable = Flowable.fromPublisher(this.processor).observeOn(Schedulers.from(EXECUTOR));

    private SubscriptionOffsetPublisher() {
    }

    static SubscriptionOffsetPublisher create() {
        return new SubscriptionOffsetPublisher();
    }

    @Override
    public void onNext(StreamCursorContext streamCursorContext) throws NakadiException {
        this.processor.onNext(streamCursorContext);
    }

    SubscriptionOffsetPublisher subscribe(StreamOffsetObserver streamOffsetObserver) {
        this.flowable.subscribe(new StreamCursorContextSubscriber(streamOffsetObserver));
        return this;
    }

    void onComplete() {
        this.processor.onComplete();
    }

    class StreamCursorContextSubscriber
    implements Subscriber<StreamCursorContext> {
        final StreamOffsetObserver streamOffsetObserver;

        StreamCursorContextSubscriber(StreamOffsetObserver streamOffsetObserver) {
            this.streamOffsetObserver = streamOffsetObserver;
        }

        @Override
        public void onSubscribe(Subscription s) {
            s.request(Long.MAX_VALUE);
        }

        @Override
        public void onNext(StreamCursorContext streamCursorContext) {
            this.streamOffsetObserver.onNext(streamCursorContext);
        }

        @Override
        public void onError(Throwable t) {
            logger.error("SubscriptionOffsetPublisher.subscriber onError " + t.getMessage(), t);
        }

        @Override
        public void onComplete() {
            logger.info("SubscriptionOffsetPublisher.subscriber onComplete ");
        }
    }
}

