package ru.tinkoff.kora.http.client.async.response;

import jakarta.annotation.Nullable;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

/* loaded from: input_file:ru/tinkoff/kora/http/client/async/response/QueuePublisher.class */
public class QueuePublisher<T> extends AtomicBoolean implements Flow.Publisher<T> {
    private final BlockingQueue<Signal<T>> queue;
    private volatile Flow.Subscriber<? super T> delegate;
    private volatile long demand;
    private volatile int wip;
    private static final AtomicLongFieldUpdater<QueuePublisher<?>> DEMAND = AtomicLongFieldUpdater.newUpdater(QueuePublisher.class, "demand");
    private static final AtomicIntegerFieldUpdater<QueuePublisher<?>> WIP = AtomicIntegerFieldUpdater.newUpdater(QueuePublisher.class, "wip");

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:ru/tinkoff/kora/http/client/async/response/QueuePublisher$Signal.class */
    public static final class Signal<T> {

        @Nullable
        private final T value;

        @Nullable
        private final Throwable error;
        static final /* synthetic */ boolean $assertionsDisabled;

        private Signal(T t, Throwable th) {
            if (!$assertionsDisabled && t != null && th != null) {
                throw new AssertionError();
            }
            this.value = t;
            this.error = th;
        }

        static {
            $assertionsDisabled = !QueuePublisher.class.desiredAssertionStatus();
        }
    }

    public QueuePublisher() {
        super(false);
        this.queue = new ArrayBlockingQueue(16);
        this.demand = 0L;
        this.wip = 0;
    }

    @Override // java.util.concurrent.Flow.Publisher
    public void subscribe(Flow.Subscriber<? super T> subscriber) {
        this.delegate = subscriber;
        subscriber.onSubscribe(new Flow.Subscription() { // from class: ru.tinkoff.kora.http.client.async.response.QueuePublisher.1
            @Override // java.util.concurrent.Flow.Subscription
            public void request(long j) {
                if (QueuePublisher.DEMAND.getAndAdd(QueuePublisher.this, j) > 0 || !QueuePublisher.WIP.compareAndSet(QueuePublisher.this, 0, 1)) {
                    return;
                }
                QueuePublisher.this.drainLoop();
            }

            @Override // java.util.concurrent.Flow.Subscription
            public void cancel() {
                QueuePublisher.this.cancel();
            }
        });
    }

    private void drainLoop() {
        try {
            BlockingQueue<Signal<T>> blockingQueue = this.queue;
            while (this.demand > 0) {
                if (get()) {
                    WIP.set(this, 0);
                    return;
                }
                Signal<T> poll = blockingQueue.poll();
                if (poll == null) {
                    WIP.set(this, 0);
                    return;
                } else if (((Signal) poll).value != null) {
                    DEMAND.decrementAndGet(this);
                    this.delegate.onNext(((Signal) poll).value);
                } else if (((Signal) poll).error != null) {
                    this.delegate.onError(((Signal) poll).error);
                } else {
                    this.delegate.onComplete();
                }
            }
            WIP.set(this, 0);
        } catch (Throwable th) {
            WIP.set(this, 0);
            throw th;
        }
    }

    public void next(T t) {
        if (get()) {
            return;
        }
        try {
            this.queue.put(new Signal<>(t, null));
            if (WIP.compareAndSet(this, 0, 1)) {
                drainLoop();
            }
        } catch (InterruptedException e) {
            set(true);
            this.delegate.onError(e);
        }
    }

    public void error(Throwable th) {
        try {
            this.queue.put(new Signal<>(null, th));
        } catch (InterruptedException e) {
        }
    }

    public void complete() {
        try {
            this.queue.put(new Signal<>(null, null));
        } catch (InterruptedException e) {
        }
        if (WIP.compareAndSet(this, 0, 1)) {
            drainLoop();
        }
    }

    public void cancel() {
        try {
            this.queue.put(new Signal<>(null, new CancellationException()));
        } catch (InterruptedException e) {
        }
        set(true);
    }
}
