package net.soundvibe.reacto.vertx.events;

import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpConnection;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import net.soundvibe.reacto.client.events.EventSource;

/* loaded from: input_file:net/soundvibe/reacto/vertx/events/VertxEventSource.class */
public final class VertxEventSource implements EventSource {
    private static final Logger log = LoggerFactory.getLogger(VertxEventSource.class);
    public static final String DATA_PREFIX = "data: ";
    private final Vertx vertx;
    private final String url;
    private HttpClient httpClient;
    private final List<Runnable> onOpenListeners = new ArrayList();
    private final List<Consumer<Throwable>> onErrorListeners = new ArrayList();
    private final List<Consumer<String>> onMessageListeners = new ArrayList();

    public VertxEventSource(Vertx vertx, String str) {
        Objects.requireNonNull(vertx, "vertx cannot be null");
        Objects.requireNonNull(str, "url cannot be null");
        this.vertx = vertx;
        this.url = str;
    }

    /* renamed from: onOpen, reason: merged with bridge method [inline-methods] */
    public VertxEventSource m5onOpen(Runnable runnable) {
        this.onOpenListeners.add(runnable);
        return this;
    }

    public VertxEventSource onError(Consumer<Throwable> consumer) {
        this.onErrorListeners.add(consumer);
        return this;
    }

    public VertxEventSource onMessage(Consumer<String> consumer) {
        this.onMessageListeners.add(consumer);
        return this;
    }

    public void open() {
        if (this.httpClient != null) {
            throw new IllegalStateException("EventSource already open");
        }
        this.httpClient = this.vertx.createHttpClient(new HttpClientOptions().setTcpKeepAlive(true).setReceiveBufferSize(32768000).setKeepAlive(true).setMaxChunkSize(32768000));
        this.httpClient.getAbs(this.url, this::handleResponse).exceptionHandler(this::handleError).endHandler(r3 -> {
            close();
        }).connectionHandler(this::handleConnectionOpen).setChunked(true).putHeader("Accept", "text/event-stream").putHeader("Connection", "keep-alive").end();
    }

    private void handleResponse(HttpClientResponse httpClientResponse) {
        httpClientResponse.handler(buffer -> {
            log.info("Received response: " + buffer.length());
            String buffer = buffer.toString();
            if (buffer == null || !buffer.startsWith(DATA_PREFIX)) {
                return;
            }
            String substring = buffer.substring(DATA_PREFIX.length());
            this.onMessageListeners.forEach(consumer -> {
                consumer.accept(substring);
            });
        });
    }

    private void handleError(Throwable th) {
        log.error("EventSource error: " + th);
        this.onErrorListeners.forEach(consumer -> {
            consumer.accept(th);
        });
        close();
    }

    private void handleConnectionOpen(HttpConnection httpConnection) {
        log.info("Connection established");
        this.onOpenListeners.forEach((v0) -> {
            v0.run();
        });
    }

    public void close() {
        if (this.httpClient != null) {
            this.httpClient.close();
            log.info("EventSource closed");
            this.httpClient = null;
        }
    }

    /* renamed from: onMessage, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ EventSource m3onMessage(Consumer consumer) {
        return onMessage((Consumer<String>) consumer);
    }

    /* renamed from: onError, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ EventSource m4onError(Consumer consumer) {
        return onError((Consumer<Throwable>) consumer);
    }
}
