package ru.tinkoff.kora.common.util;

import jakarta.annotation.Nonnull;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Signal;
import reactor.util.context.Context;
import ru.tinkoff.kora.common.Context;

/* loaded from: input_file:ru/tinkoff/kora/common/util/ByteBufferFluxInputStream.class */
public class ByteBufferFluxInputStream extends InputStream implements CoreSubscriber<ByteBuffer> {
    private final BlockingQueue<Signal<ByteBuffer>> queue = new ArrayBlockingQueue(4);
    private final AtomicInteger demand = new AtomicInteger(1);
    private final Context context = Context.Reactor.inject(reactor.util.context.Context.empty(), ru.tinkoff.kora.common.Context.current());
    private ByteBuffer currentBuffer = null;
    private volatile Subscription subscription = null;
    private volatile boolean completed = false;

    public ByteBufferFluxInputStream(Flux<ByteBuffer> flux) {
        flux.subscribe(this);
    }

    @Override // java.io.InputStream
    public int read() {
        byte[] bArr = new byte[1];
        if (read(bArr, 0, 1) <= 0) {
            return -1;
        }
        return bArr[0];
    }

    @Override // java.io.InputStream
    public int read(@Nonnull byte[] bArr, int i, int i2) {
        if (this.completed) {
            return -1;
        }
        while (true) {
            if (this.currentBuffer != null && this.currentBuffer.hasRemaining()) {
                int min = Math.min(i2, this.currentBuffer.remaining());
                this.currentBuffer.get(bArr, i, min);
                if (!this.currentBuffer.hasRemaining()) {
                    this.currentBuffer = null;
                }
                return min;
            }
            if (this.demand.compareAndSet(0, 1)) {
                this.subscription.request(1L);
            }
            try {
                Signal<ByteBuffer> take = this.queue.take();
                if (take.isOnNext()) {
                    this.currentBuffer = (ByteBuffer) take.get();
                } else {
                    if (take.isOnError()) {
                        this.completed = true;
                        throw toRuntimeException(take.getThrowable());
                    }
                    if (take.isOnComplete()) {
                        this.completed = true;
                        return -1;
                    }
                }
            } catch (InterruptedException e) {
                this.completed = true;
                if (this.subscription != null) {
                    this.subscription.cancel();
                }
                throw toRuntimeException(e);
            }
        }
    }

    @Override // java.io.InputStream, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.completed) {
            return;
        }
        this.completed = true;
        this.subscription.cancel();
        this.currentBuffer = null;
        this.queue.clear();
    }

    private RuntimeException toRuntimeException(Throwable th) {
        Throwable unwrap = Exceptions.unwrap(th);
        return unwrap instanceof RuntimeException ? (RuntimeException) unwrap : new RuntimeException(unwrap);
    }

    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        if (this.completed) {
            subscription.cancel();
        } else {
            subscription.request(1L);
        }
    }

    public void onNext(ByteBuffer byteBuffer) {
        if (this.completed) {
            return;
        }
        this.demand.decrementAndGet();
        this.queue.offer(Signal.next(byteBuffer));
    }

    public void onError(Throwable th) {
        this.queue.offer(Signal.error(th));
    }

    public void onComplete() {
        this.queue.offer(Signal.complete());
    }

    @Nonnull
    public reactor.util.context.Context currentContext() {
        return this.context;
    }
}
