package net.oneandone.reactive.pipe;

import java.util.Optional;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:net/oneandone/reactive/pipe/PublisherSourcedPipe.class */
class PublisherSourcedPipe<T> implements Pipe<T> {
    private final Publisher<T> publisher;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/oneandone/reactive/pipe/PublisherSourcedPipe$ConsumerAdapter.class */
    public static final class ConsumerAdapter<E> implements Subscriber<E> {
        private final Consumer<? super E> consumer;
        private final Consumer<? super Throwable> errorConsumer;
        private final Consumer<Void> completeConsumer;
        private final AtomicReference<Optional<Subscription>> subscriptionRef = new AtomicReference<>(Optional.empty());

        public ConsumerAdapter(Consumer<? super E> consumer, Consumer<? super Throwable> consumer2, Consumer<Void> consumer3) {
            this.consumer = consumer;
            this.errorConsumer = consumer2;
            this.completeConsumer = consumer3;
        }

        public void onSubscribe(Subscription subscription) {
            this.subscriptionRef.set(Optional.of(subscription));
            subscription.request(1L);
        }

        public void onNext(E e) {
            try {
                this.consumer.accept(e);
                this.subscriptionRef.get().ifPresent(subscription -> {
                    subscription.request(1L);
                });
            } catch (RuntimeException e2) {
                onError(e2);
            }
        }

        public void onError(Throwable th) {
            try {
                this.errorConsumer.accept(th);
            } finally {
                this.subscriptionRef.get().ifPresent(subscription -> {
                    subscription.cancel();
                });
            }
        }

        public void onComplete() {
            this.completeConsumer.accept(null);
        }
    }

    /* loaded from: input_file:net/oneandone/reactive/pipe/PublisherSourcedPipe$FilteringProcessor.class */
    private static final class FilteringProcessor<T> extends ForwardingProcessor<T, T> {
        private final Predicate<? super T> predicate;

        public FilteringProcessor(Publisher<T> publisher, Predicate<? super T> predicate) {
            super(publisher, obj -> {
                return obj;
            });
            this.predicate = predicate;
        }

        @Override // net.oneandone.reactive.pipe.PublisherSourcedPipe.ForwardingProcessor
        public void onNext(T t) {
            if (this.predicate.test(t)) {
                super.onNext(t);
            } else {
                request(1L);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/oneandone/reactive/pipe/PublisherSourcedPipe$ForwardingProcessor.class */
    public static class ForwardingProcessor<T, R> implements Processor<T, R>, Subscription {
        private final Function<? super T, ? extends R> mapper;
        private final ForwardingSubscription forwardingSubscription = new ForwardingSubscription();
        private final AtomicReference<Optional<Subscriber<? super R>>> sinkSubscriberRef = new AtomicReference<>(Optional.empty());

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:net/oneandone/reactive/pipe/PublisherSourcedPipe$ForwardingProcessor$ForwardingSubscription.class */
        public static final class ForwardingSubscription implements Subscription {
            private long pendingRequests;
            private boolean pendingCancel;
            private Subscription sourceSubscription;

            private ForwardingSubscription() {
                this.pendingRequests = 0L;
                this.pendingCancel = false;
                this.sourceSubscription = null;
            }

            void setSubscription(Subscription subscription) {
                synchronized (this) {
                    this.sourceSubscription = subscription;
                    if (this.pendingRequests > 0) {
                        subscription.request(this.pendingRequests);
                        this.pendingRequests = 0L;
                    }
                    if (this.pendingCancel) {
                        subscription.cancel();
                        this.pendingCancel = false;
                    }
                }
            }

            public void request(long j) {
                synchronized (this) {
                    if (this.sourceSubscription == null) {
                        this.pendingRequests += j;
                    } else {
                        this.sourceSubscription.request(j);
                    }
                }
            }

            public void cancel() {
                synchronized (this) {
                    if (this.sourceSubscription == null) {
                        this.pendingCancel = true;
                    } else {
                        this.sourceSubscription.cancel();
                    }
                }
            }
        }

        public ForwardingProcessor(Publisher<T> publisher, Function<? super T, ? extends R> function) {
            this.mapper = function;
            publisher.subscribe(this);
        }

        public void onSubscribe(Subscription subscription) {
            synchronized (this) {
                this.forwardingSubscription.setSubscription(subscription);
            }
        }

        public void onNext(T t) {
            this.sinkSubscriberRef.get().ifPresent(subscriber -> {
                subscriber.onNext(this.mapper.apply(t));
            });
        }

        public void onError(Throwable th) {
            this.sinkSubscriberRef.get().ifPresent(subscriber -> {
                subscriber.onError(th);
            });
        }

        public void onComplete() {
            this.sinkSubscriberRef.get().ifPresent(subscriber -> {
                subscriber.onComplete();
            });
        }

        public void subscribe(Subscriber<? super R> subscriber) {
            this.sinkSubscriberRef.set(Optional.of(subscriber));
            subscriber.onSubscribe(this);
        }

        public void request(long j) {
            ForkJoinPool.commonPool().execute(() -> {
                this.forwardingSubscription.request(j);
            });
        }

        public void cancel() {
            this.forwardingSubscription.cancel();
        }
    }

    /* loaded from: input_file:net/oneandone/reactive/pipe/PublisherSourcedPipe$LimittingPublisher.class */
    private static final class LimittingPublisher<T> extends ForwardingProcessor<T, T> {
        private final long max;
        private long numProcessed;

        public LimittingPublisher(Publisher<T> publisher, long j) {
            super(publisher, obj -> {
                return obj;
            });
            this.max = j;
        }

        @Override // net.oneandone.reactive.pipe.PublisherSourcedPipe.ForwardingProcessor
        public void onNext(T t) {
            this.numProcessed++;
            if (this.numProcessed > this.max) {
                cancel();
            } else {
                super.onNext(t);
            }
        }
    }

    /* loaded from: input_file:net/oneandone/reactive/pipe/PublisherSourcedPipe$SkippingProcessor.class */
    private static final class SkippingProcessor<T> extends ForwardingProcessor<T, T> {
        private final long numToSkip;
        private long numProcessed;

        public SkippingProcessor(Publisher<T> publisher, long j) {
            super(publisher, obj -> {
                return obj;
            });
            this.numToSkip = j;
        }

        @Override // net.oneandone.reactive.pipe.PublisherSourcedPipe.ForwardingProcessor
        public void onNext(T t) {
            this.numProcessed++;
            if (this.numToSkip >= this.numProcessed) {
                request(1L);
            } else {
                super.onNext(t);
            }
        }
    }

    public PublisherSourcedPipe(Publisher<T> publisher) {
        this.publisher = publisher;
    }

    @Override // net.oneandone.reactive.pipe.Pipe
    public void consume(Subscriber<? super T> subscriber) {
        this.publisher.subscribe(subscriber);
    }

    @Override // net.oneandone.reactive.pipe.Pipe
    public void consume(Consumer<? super T> consumer) {
        consume(consumer, th -> {
        });
    }

    @Override // net.oneandone.reactive.pipe.Pipe
    public void consume(Consumer<? super T> consumer, Consumer<? super Throwable> consumer2) {
        consume(consumer, consumer2, r1 -> {
        });
    }

    @Override // net.oneandone.reactive.pipe.Pipe
    public void consume(Consumer<? super T> consumer, Consumer<? super Throwable> consumer2, Consumer<Void> consumer3) {
        consume(new ConsumerAdapter(consumer, consumer2, consumer3));
    }

    @Override // net.oneandone.reactive.pipe.Pipe
    public <V> PublisherSourcedPipe<V> map(Function<? super T, ? extends V> function) {
        return new PublisherSourcedPipe<>(new ForwardingProcessor(this.publisher, function));
    }

    @Override // net.oneandone.reactive.pipe.Pipe
    public Pipe<T> filter(Predicate<? super T> predicate) {
        return new PublisherSourcedPipe(new FilteringProcessor(this.publisher, predicate));
    }

    @Override // net.oneandone.reactive.pipe.Pipe
    public Pipe<T> skip(long j) {
        return new PublisherSourcedPipe(new SkippingProcessor(this.publisher, j));
    }

    @Override // net.oneandone.reactive.pipe.Pipe
    public Pipe<T> limit(long j) {
        return new PublisherSourcedPipe(new LimittingPublisher(this.publisher, j));
    }
}
