package org.apache.cxf.microprofile.client.sse;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.ws.rs.ext.Providers;
import javax.ws.rs.sse.InboundSseEvent;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

/* loaded from: input_file:lib/cxf-rt-rs-mp-client-3.5.5.jar:org/apache/cxf/microprofile/client/sse/SsePublisher.class */
public class SsePublisher implements Publisher<InboundSseEvent> {
    final Executor executor;
    final BufferedReader br;
    final Providers providers;
    final List<SseSubscription> subscriptions = new CopyOnWriteArrayList();
    final AtomicBoolean isStarted = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: package-private */
    public SsePublisher(InputStream inputStream, Executor executor, Providers providers) {
        this.br = new BufferedReader(new InputStreamReader(inputStream));
        this.executor = executor;
        this.providers = providers;
    }

    @Override // org.reactivestreams.Publisher
    public void subscribe(Subscriber<? super InboundSseEvent> subscriber) {
        SseSubscription sseSubscription = new SseSubscription(this, subscriber);
        this.subscriptions.add(sseSubscription);
        sseSubscription.fireSubscribe();
        start();
    }

    private void start() {
        if (this.isStarted.compareAndSet(false, true)) {
            this.executor.execute(() -> {
                try {
                    BufferedReader bufferedReader = this.br;
                    Throwable th = null;
                    try {
                        SseEventBuilder sseEventBuilder = new SseEventBuilder(this.providers);
                        String readLine = this.br.readLine();
                        while (readLine != null && !this.subscriptions.isEmpty()) {
                            if (readLine.startsWith("data:")) {
                                sseEventBuilder.data(removeSpace(readLine.substring(5)));
                            } else if (readLine.startsWith("id:")) {
                                sseEventBuilder.id(removeSpace(readLine.substring(3)));
                            } else if (readLine.startsWith("event:")) {
                                sseEventBuilder.name(removeSpace(readLine.substring(6)));
                            } else if (readLine.startsWith(":")) {
                                sseEventBuilder.comment(removeSpace(readLine.substring(1)));
                            } else if ("".equals(readLine)) {
                                InboundSseEvent build = sseEventBuilder.build();
                                Iterator<SseSubscription> it = this.subscriptions.iterator();
                                while (it.hasNext()) {
                                    it.next().fireEvent(build);
                                }
                                sseEventBuilder = new SseEventBuilder(this.providers);
                            }
                            readLine = this.br.readLine();
                        }
                        Iterator<SseSubscription> it2 = this.subscriptions.iterator();
                        while (it2.hasNext()) {
                            it2.next().complete();
                        }
                        if (bufferedReader != null) {
                            if (0 != 0) {
                                try {
                                    bufferedReader.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                bufferedReader.close();
                            }
                        }
                    } catch (Throwable th3) {
                        if (bufferedReader != null) {
                            if (0 != 0) {
                                try {
                                    bufferedReader.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                bufferedReader.close();
                            }
                        }
                        throw th3;
                    }
                } catch (IOException e) {
                    Iterator<SseSubscription> it3 = this.subscriptions.iterator();
                    while (it3.hasNext()) {
                        it3.next().fireError(e);
                    }
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeSubscription(SseSubscription sseSubscription) {
        this.subscriptions.remove(sseSubscription);
    }

    private String removeSpace(String str) {
        return (str == null || !str.startsWith(" ")) ? str : str.substring(1);
    }
}
