package org.apache.cxf.microprofile.client.sse;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.sse.InboundSseEvent;
import org.apache.cxf.common.logging.LogUtils;
import org.reactivestreams.Processor;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:lib/cxf-rt-rs-mp-client-3.4.8.jar:org/apache/cxf/microprofile/client/sse/SseTypeSafeProcessor.class */
public class SseTypeSafeProcessor<T> implements Processor<InboundSseEvent, T> {
    private static final Logger LOG = LogUtils.getL7dLogger(SseTypeSafeProcessor.class);
    private SseSubscription incomingSubscription;
    private final SsePublisher ssePublisher;
    private final GenericType<?> type;
    private final List<Subscriber<? super T>> subscribers = new LinkedList();
    private final AtomicBoolean isClosed = new AtomicBoolean();
    private final AtomicBoolean isSubscribed = new AtomicBoolean();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Multi-variable type inference failed */
    public SseTypeSafeProcessor(GenericType<T> genericType, SsePublisher ssePublisher) {
        this.type = genericType;
        this.ssePublisher = ssePublisher;
    }

    @Override // org.reactivestreams.Subscriber
    public void onSubscribe(Subscription subscription) {
        this.incomingSubscription = (SseSubscription) subscription;
        LOG.finest("onSubscribe " + subscription);
    }

    @Override // org.reactivestreams.Subscriber
    public void onNext(InboundSseEvent inboundSseEvent) {
        LOG.entering(SseTypeSafeProcessor.class.getName(), "onNext", inboundSseEvent);
        if (this.incomingSubscription == null) {
            throw new IllegalStateException("not subscribed");
        }
        if (!this.isClosed.get()) {
            Object readData = inboundSseEvent.readData(this.type);
            Iterator<Subscriber<? super T>> it = this.subscribers.iterator();
            while (it.hasNext()) {
                it.next().onNext(readData);
            }
        }
        LOG.exiting(SseTypeSafeProcessor.class.getName(), "onNext");
    }

    @Override // org.reactivestreams.Subscriber
    public void onError(Throwable th) {
        if (this.isClosed.compareAndSet(false, true)) {
            Iterator<Subscriber<? super T>> it = this.subscribers.iterator();
            while (it.hasNext()) {
                it.next().onError(th);
            }
        }
    }

    @Override // org.reactivestreams.Subscriber
    public void onComplete() {
        if (this.isClosed.compareAndSet(false, true)) {
            Iterator<Subscriber<? super T>> it = this.subscribers.iterator();
            while (it.hasNext()) {
                it.next().onComplete();
            }
        }
    }

    @Override // org.reactivestreams.Publisher
    public void subscribe(Subscriber<? super T> subscriber) {
        LOG.finest("subscribe " + subscriber);
        this.subscribers.add(subscriber);
        if (this.isSubscribed.compareAndSet(false, true)) {
            this.ssePublisher.subscribe(this);
        }
        if (this.incomingSubscription == null) {
            throw new IllegalStateException();
        }
        subscriber.onSubscribe(new SseTypeSafeSubscription(this.incomingSubscription));
    }
}
