package net.oneandone.reactive.sse.servlet;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;
import javax.servlet.ServletOutputStream;
import net.oneandone.reactive.sse.ServerSentEvent;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:net/oneandone/reactive/sse/servlet/ServletSseSubscriber.class */
public class ServletSseSubscriber implements Subscriber<ServerSentEvent> {
    private static final Logger LOG = Logger.getLogger(ServletSseSubscriber.class.getName());
    private final AtomicBoolean isOpen;
    private final AtomicReference<Subscription> subscriptionRef;
    private final SseWriteableChannel channel;

    /* loaded from: input_file:net/oneandone/reactive/sse/servlet/ServletSseSubscriber$IllegalStateSubscription.class */
    private static final class IllegalStateSubscription implements Subscription {
        private IllegalStateSubscription() {
        }

        public void request(long j) {
            throw new IllegalStateException();
        }

        public void cancel() {
            throw new IllegalStateException();
        }
    }

    public ServletSseSubscriber(ServletOutputStream servletOutputStream) {
        this(servletOutputStream, Duration.ofSeconds(25L));
    }

    public ServletSseSubscriber(ServletOutputStream servletOutputStream, Duration duration) {
        this.isOpen = new AtomicBoolean(true);
        this.subscriptionRef = new AtomicReference<>(new IllegalStateSubscription());
        this.channel = new SseWriteableChannel(servletOutputStream, th -> {
            onError(th);
        }, duration);
    }

    public void onSubscribe(Subscription subscription) {
        this.subscriptionRef.set(subscription);
        this.subscriptionRef.get().request(25L);
    }

    public void onNext(ServerSentEvent serverSentEvent) {
        this.channel.writeEventAsync(serverSentEvent).thenAccept(num -> {
            this.subscriptionRef.get().request(1L);
        });
    }

    public void onError(Throwable th) {
        LOG.fine("error on source stream. stop streaming " + th.getMessage());
        close();
    }

    public void onComplete() {
        close();
    }

    private void close() {
        if (this.isOpen.getAndSet(false)) {
            this.subscriptionRef.get().cancel();
            this.channel.close();
        }
    }
}
