package net.oneandone.reactive.rest.container;

import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.container.AsyncResponse;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import rx.Observable;
import rx.RxReactiveStreams;

/* loaded from: input_file:net/oneandone/reactive/rest/container/ObservableConsumer.class */
public class ObservableConsumer<T> implements BiConsumer<Observable<T>, Throwable> {
    private final AsyncResponse asyncResponse;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/oneandone/reactive/rest/container/ObservableConsumer$SingleEntityResponseSubscriber.class */
    public static class SingleEntityResponseSubscriber<T> implements Subscriber<T> {
        private final AtomicBoolean isOpen = new AtomicBoolean(true);
        private final AtomicReference<Subscription> subscriptionRef = new AtomicReference<>();
        private final AtomicBoolean isResponseProcessed = new AtomicBoolean();
        private final AsyncResponse response;

        public SingleEntityResponseSubscriber(AsyncResponse asyncResponse) {
            this.response = asyncResponse;
        }

        public void onSubscribe(Subscription subscription) {
            this.subscriptionRef.set(subscription);
            this.subscriptionRef.get().request(1L);
        }

        public void onNext(T t) {
            this.isResponseProcessed.set(true);
            this.response.resume(t);
        }

        public void onError(Throwable th) {
            this.isResponseProcessed.set(true);
            this.response.resume(unwrapIfNecessary(th, 10));
        }

        private static Throwable unwrapIfNecessary(Throwable th, int i) {
            Throwable cause;
            return (!ObservableConsumer.isCompletionException(th) || (cause = th.getCause()) == null) ? th : i > 1 ? unwrapIfNecessary(cause, i - 1) : cause;
        }

        public void onComplete() {
            if (!this.isResponseProcessed.get()) {
                onError(new NotFoundException());
            }
            close();
        }

        private void close() {
            if (this.isOpen.getAndSet(false)) {
                this.subscriptionRef.get().cancel();
            }
        }
    }

    private ObservableConsumer(AsyncResponse asyncResponse) {
        this.asyncResponse = asyncResponse;
    }

    public static final <T> BiConsumer<Observable<T>, Throwable> writeSingleTo(AsyncResponse asyncResponse) {
        return new ObservableConsumer(asyncResponse);
    }

    @Override // java.util.function.BiConsumer
    public void accept(Observable<T> observable, Throwable th) {
        SingleEntityResponseSubscriber singleEntityResponseSubscriber = new SingleEntityResponseSubscriber(this.asyncResponse);
        if (th != null) {
            singleEntityResponseSubscriber.onError(th);
        } else {
            RxReactiveStreams.subscribe(observable, singleEntityResponseSubscriber);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean isCompletionException(Throwable th) {
        return CompletionException.class.isAssignableFrom(th.getClass());
    }
}
