package com.github.davidmoten.rx.internal.operators;

import com.github.davidmoten.rx.buffertofile.DataSerializer;
import com.github.davidmoten.rx.buffertofile.Options;
import com.github.davidmoten.util.Preconditions;
import java.io.File;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import rx.Observable;
import rx.Producer;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Func0;
import rx.internal.operators.BackpressureUtils;
import rx.observers.Subscribers;

/* loaded from: input_file:com/github/davidmoten/rx/internal/operators/OperatorBufferToFile.class */
public final class OperatorBufferToFile<T> implements Observable.Operator<T, T> {
    private final DataSerializer<T> dataSerializer;
    private final Scheduler scheduler;
    private final Options options;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/davidmoten/rx/internal/operators/OperatorBufferToFile$OnSubscribeFromQueue.class */
    public static final class OnSubscribeFromQueue<T> implements Observable.OnSubscribe<T> {
        private final AtomicReference<QueueProducer<T>> queueProducer;
        private final QueueWithResources<T> queue;
        private final Scheduler.Worker worker;
        private final Options options;

        OnSubscribeFromQueue(AtomicReference<QueueProducer<T>> atomicReference, QueueWithResources<T> queueWithResources, Scheduler.Worker worker, Options options) {
            this.queueProducer = atomicReference;
            this.queue = queueWithResources;
            this.worker = worker;
            this.options = options;
        }

        public void call(Subscriber<? super T> subscriber) {
            QueueProducer<T> queueProducer = new QueueProducer<>(this.queue, subscriber, this.worker, this.options.delayError());
            this.queueProducer.set(queueProducer);
            subscriber.setProducer(queueProducer);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/davidmoten/rx/internal/operators/OperatorBufferToFile$ParentSubscriber.class */
    public static final class ParentSubscriber<T> extends Subscriber<T> {
        private final AtomicReference<QueueProducer<T>> queueProducer;

        ParentSubscriber(AtomicReference<QueueProducer<T>> atomicReference) {
            this.queueProducer = atomicReference;
        }

        public void onStart() {
            request(Long.MAX_VALUE);
        }

        public void onCompleted() {
            this.queueProducer.get().onCompleted();
        }

        public void onError(Throwable th) {
            this.queueProducer.get().onError(th);
        }

        public void onNext(T t) {
            this.queueProducer.get().onNext(t);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/davidmoten/rx/internal/operators/OperatorBufferToFile$QueueProducer.class */
    public static final class QueueProducer<T> extends AtomicLong implements Producer, Action0 {
        private static final long serialVersionUID = 2521533710633950102L;
        private final QueueWithResources<T> queue;
        private final Subscriber<? super T> child;
        private final Scheduler.Worker worker;
        private final boolean delayError;
        private final AtomicInteger drainRequested = new AtomicInteger(0);
        private Throwable error = null;
        private volatile boolean done = false;

        QueueProducer(QueueWithResources<T> queueWithResources, Subscriber<? super T> subscriber, Scheduler.Worker worker, boolean z) {
            this.queue = queueWithResources;
            this.child = subscriber;
            this.worker = worker;
            this.delayError = z;
        }

        void onNext(T t) {
            if (this.queue.offer(t)) {
                drain();
            } else {
                onError(new RuntimeException("could not place item on queue (queue.offer(item) returned false), item= " + t));
            }
        }

        void onError(Throwable th) {
            this.error = th;
            this.done = true;
            drain();
        }

        void onCompleted() {
            this.done = true;
            drain();
        }

        public void request(long j) {
            if (j > 0) {
                BackpressureUtils.getAndAddRequest(this, j);
                drain();
            }
        }

        private void drain() {
            if (this.child.isUnsubscribed() || this.drainRequested.getAndIncrement() != 0) {
                return;
            }
            this.worker.schedule(this);
        }

        public void call() {
            try {
                drainNow();
            } catch (Throwable th) {
                this.child.onError(th);
            }
        }

        private void drainNow() {
            long j;
            if (this.child.isUnsubscribed()) {
                return;
            }
            long j2 = get();
            while (true) {
                this.drainRequested.set(1);
                long j3 = 0;
                while (true) {
                    j = j3;
                    if (j2 <= 0) {
                        break;
                    }
                    T poll = this.queue.poll();
                    if (poll != null) {
                        if (NullSentinel.isNullSentinel(poll)) {
                            this.child.onNext((Object) null);
                        } else {
                            this.child.onNext(poll);
                        }
                        j2--;
                        j3 = j + 1;
                    } else if (finished()) {
                        return;
                    }
                }
                j2 = BackpressureUtils.produced(this, j);
                if (this.child.isUnsubscribed()) {
                    return;
                }
                if (j2 == 0 && finished()) {
                    return;
                }
            }
        }

        private boolean finished() {
            if (!this.done) {
                return this.drainRequested.compareAndSet(1, 0);
            }
            Throwable th = this.error;
            if (this.queue.isEmpty()) {
                this.queue.unsubscribe();
                if (th != null) {
                    this.child.onError(th);
                    return true;
                }
                this.child.onCompleted();
                return true;
            }
            if (th == null || this.delayError) {
                return this.drainRequested.compareAndSet(1, 0);
            }
            this.queue.unsubscribe();
            this.child.onError(th);
            return true;
        }
    }

    public OperatorBufferToFile(DataSerializer<T> dataSerializer, Scheduler scheduler, Options options) {
        Preconditions.checkNotNull(dataSerializer);
        Preconditions.checkNotNull(scheduler);
        Preconditions.checkNotNull(options);
        this.scheduler = scheduler;
        this.dataSerializer = dataSerializer;
        this.options = options;
    }

    public Subscriber<? super T> call(Subscriber<? super T> subscriber) {
        QueueWithResources createFileBasedQueue = createFileBasedQueue(this.dataSerializer, this.options);
        AtomicReference atomicReference = new AtomicReference();
        Scheduler.Worker createWorker = this.scheduler.createWorker();
        Observable create = Observable.create(new OnSubscribeFromQueue(atomicReference, createFileBasedQueue, createWorker, this.options));
        ParentSubscriber parentSubscriber = new ParentSubscriber(atomicReference);
        subscriber.add(parentSubscriber);
        subscriber.add(createFileBasedQueue);
        Subscriber wrap = Subscribers.wrap(subscriber);
        subscriber.add(createWorker);
        create.unsafeSubscribe(wrap);
        return parentSubscriber;
    }

    private static <T> QueueWithResources<T> createFileBasedQueue(final DataSerializer<T> dataSerializer, final Options options) {
        return (options.rolloverEvery() == Long.MAX_VALUE && options.rolloverSizeBytes() == Long.MAX_VALUE) ? new QueueWithResourcesNonBlockingUnsubscribe(new FileBasedSPSCQueue(options.bufferSizeBytes(), (File) options.fileFactory().call(), dataSerializer)) : new QueueWithResourcesNonBlockingUnsubscribe(new RollingSPSCQueue(new Func0<QueueWithResources<T>>() { // from class: com.github.davidmoten.rx.internal.operators.OperatorBufferToFile.1
            /* renamed from: call, reason: merged with bridge method [inline-methods] */
            public QueueWithResources<T> m47call() {
                return new FileBasedSPSCQueue(Options.this.bufferSizeBytes(), (File) Options.this.fileFactory().call(), dataSerializer);
            }
        }, options.rolloverSizeBytes(), options.rolloverEvery()));
    }
}
