package net.oneandone.reactive.sse.servlet;

import com.google.common.collect.Lists;
import com.google.common.io.Closeables;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import javax.servlet.ReadListener;
import javax.servlet.ServletInputStream;
import net.oneandone.reactive.sse.ServerSentEvent;
import net.oneandone.reactive.sse.ServerSentEventParser;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:net/oneandone/reactive/sse/servlet/SseReadableChannel.class */
public class SseReadableChannel {
    private final Object pendingConsumesLock = new Object();
    private final AtomicInteger numPendingConsumes = new AtomicInteger(0);
    private final SSEInputStream serverSentEventsStream;
    private final Consumer<ServerSentEvent> eventConsumer;
    private final Consumer<Throwable> errorConsumer;
    private final Consumer<Void> completionConsumer;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/oneandone/reactive/sse/servlet/SseReadableChannel$SSEInputStream.class */
    public static class SSEInputStream implements Closeable {
        private final Queue<ServerSentEvent> bufferedEvents = Lists.newLinkedList();
        private final ServerSentEventParser parser = new ServerSentEventParser();
        private final byte[] buf = new byte[1024];
        private int len = -1;
        private final ServletInputStream is;

        public SSEInputStream(ServletInputStream servletInputStream) {
            this.is = servletInputStream;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            Closeables.closeQuietly(this.is);
        }

        public Optional<ServerSentEvent> next() throws IOException {
            if (this.bufferedEvents.isEmpty()) {
                while (this.bufferedEvents.isEmpty() && isNetworkdataAvailable()) {
                    int read = this.is.read(this.buf);
                    this.len = read;
                    if (read <= 0) {
                        break;
                    }
                    this.parser.parse(ByteBuffer.wrap(this.buf, 0, this.len)).forEach(serverSentEvent -> {
                        this.bufferedEvents.add(serverSentEvent);
                    });
                }
            }
            return Optional.ofNullable(this.bufferedEvents.poll());
        }

        private boolean isNetworkdataAvailable() {
            try {
                return this.is.isReady();
            } catch (IllegalStateException e) {
                return false;
            }
        }
    }

    /* loaded from: input_file:net/oneandone/reactive/sse/servlet/SseReadableChannel$ServletReadListener.class */
    private final class ServletReadListener implements ReadListener {
        private ServletReadListener() {
        }

        public void onAllDataRead() throws IOException {
            SseReadableChannel.this.completionConsumer.accept(null);
        }

        public void onError(Throwable th) {
            SseReadableChannel.this.onError(th);
        }

        public void onDataAvailable() throws IOException {
            SseReadableChannel.this.proccessPendingReads();
        }
    }

    public SseReadableChannel(ServletInputStream servletInputStream, Consumer<ServerSentEvent> consumer, Consumer<Throwable> consumer2, Consumer<Void> consumer3) {
        this.eventConsumer = consumer;
        this.errorConsumer = consumer2;
        this.completionConsumer = consumer3;
        this.serverSentEventsStream = new SSEInputStream(servletInputStream);
        servletInputStream.setReadListener(new ServletReadListener());
    }

    public void consumeNextEvent() {
        synchronized (this.pendingConsumesLock) {
            try {
                Optional<ServerSentEvent> next = this.serverSentEventsStream.next();
                if (next.isPresent()) {
                    this.eventConsumer.accept(next.get());
                } else {
                    this.numPendingConsumes.incrementAndGet();
                }
            } catch (IOException | RuntimeException e) {
                onError(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void proccessPendingReads() {
        synchronized (this.pendingConsumesLock) {
            while (this.numPendingConsumes.get() > 0) {
                try {
                    Optional<ServerSentEvent> next = this.serverSentEventsStream.next();
                    if (!next.isPresent()) {
                        return;
                    }
                    this.numPendingConsumes.decrementAndGet();
                    this.eventConsumer.accept(next.get());
                } catch (IOException | RuntimeException e) {
                    onError(e);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onError(Throwable th) {
        this.errorConsumer.accept(th);
        close();
    }

    public void close() {
        this.serverSentEventsStream.close();
    }
}
