package net.oneandone.reactive.sse.servlet;

import java.util.concurrent.ForkJoinPool;
import javax.servlet.ServletInputStream;
import net.oneandone.reactive.sse.ServerSentEvent;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:net/oneandone/reactive/sse/servlet/ServletSsePublisher.class */
public class ServletSsePublisher implements Publisher<ServerSentEvent> {
    private final ServletInputStream in;
    private boolean subscribed = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/oneandone/reactive/sse/servlet/ServletSsePublisher$SEEEventReaderSubscription.class */
    public static final class SEEEventReaderSubscription implements Subscription {
        private final Subscriber<? super ServerSentEvent> subscriberProxy;
        private final SseReadableChannel channel;

        /* loaded from: input_file:net/oneandone/reactive/sse/servlet/ServletSsePublisher$SEEEventReaderSubscription$SubscriberProxy.class */
        private final class SubscriberProxy implements Subscriber<ServerSentEvent> {
            private boolean isOpen = true;
            private final Subscriber<? super ServerSentEvent> subscriber;

            public SubscriberProxy(Subscriber<? super ServerSentEvent> subscriber) {
                this.subscriber = subscriber;
            }

            public void onSubscribe(Subscription subscription) {
            }

            public synchronized void onNext(ServerSentEvent serverSentEvent) {
                try {
                    if (this.isOpen) {
                        this.subscriber.onNext(serverSentEvent);
                    }
                } catch (RuntimeException e) {
                    onError(e);
                }
            }

            public synchronized void onError(Throwable th) {
                if (this.isOpen) {
                    this.isOpen = false;
                    this.subscriber.onError(th);
                    try {
                        SEEEventReaderSubscription.this.channel.close();
                    } catch (RuntimeException e) {
                    }
                }
            }

            public synchronized void onComplete() {
                if (this.isOpen) {
                    this.isOpen = false;
                    this.subscriber.onComplete();
                    try {
                        SEEEventReaderSubscription.this.channel.close();
                    } catch (RuntimeException e) {
                    }
                }
            }
        }

        public SEEEventReaderSubscription(ServletInputStream servletInputStream, Subscriber<? super ServerSentEvent> subscriber) {
            this.subscriberProxy = new SubscriberProxy(subscriber);
            this.channel = new SseReadableChannel(servletInputStream, serverSentEvent -> {
                this.subscriberProxy.onNext(serverSentEvent);
            }, th -> {
                this.subscriberProxy.onError(th);
            }, r3 -> {
                this.subscriberProxy.onComplete();
            });
        }

        public void cancel() {
            this.subscriberProxy.onComplete();
        }

        public void request(long j) {
            ForkJoinPool.commonPool().execute(() -> {
                requestNext(j);
            });
        }

        private void requestNext(long j) {
            for (int i = 0; i < j; i++) {
                try {
                    this.channel.consumeNextEvent();
                } catch (RuntimeException e) {
                    this.subscriberProxy.onError(e);
                }
            }
        }
    }

    public ServletSsePublisher(ServletInputStream servletInputStream) {
        this.in = servletInputStream;
    }

    public void subscribe(Subscriber<? super ServerSentEvent> subscriber) {
        synchronized (this) {
            if (this.subscribed) {
                subscriber.onError(new IllegalStateException("subscription already exists. Multi-subscribe is not supported"));
            } else {
                this.subscribed = true;
                subscriber.onSubscribe(new SEEEventReaderSubscription(this.in, subscriber));
            }
        }
    }
}
