package com.azure.storage.common.implementation;

import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;

/* loaded from: input_file:applicationinsights-agent-3.4.13.jar:inst/com/azure/storage/common/implementation/FluxInputStream.classdata */
public class FluxInputStream extends InputStream {
    private static final ClientLogger LOGGER = new ClientLogger((Class<?>) FluxInputStream.class);
    private Flux<ByteBuffer> data;
    private Subscription subscription;
    private ByteArrayInputStream buffer;
    private boolean subscribed = false;
    private boolean fluxComplete = false;
    private boolean waitingForData = false;
    private final Lock lock = new ReentrantLock();
    private final Condition dataAvailable = this.lock.newCondition();
    private IOException lastError;

    public FluxInputStream(Flux<ByteBuffer> flux) {
        this.data = flux;
    }

    @Override // java.io.InputStream
    public int read() throws IOException {
        byte[] bArr = new byte[1];
        if (read(bArr, 0, 1) == -1) {
            return -1;
        }
        return bArr[0];
    }

    @Override // java.io.InputStream
    public int read(byte[] bArr, int i, int i2) throws IOException {
        validateParameters(bArr, i, i2);
        if (i2 == 0) {
            return 0;
        }
        if (!this.subscribed) {
            blockForData();
        }
        if (this.buffer == null) {
            if (this.lastError != null) {
                throw ((IOException) LOGGER.logThrowableAsError(this.lastError));
            }
            if (this.fluxComplete) {
                return -1;
            }
            throw LOGGER.logExceptionAsError(new IllegalStateException("An unexpected error occurred. No data was read from the stream but the stream did not indicate completion."));
        }
        if (this.buffer.available() == 0) {
            if (this.fluxComplete) {
                return -1;
            }
            blockForData();
        }
        if (this.buffer.available() > 0) {
            return this.buffer.read(bArr, i, i2);
        }
        if (this.fluxComplete) {
            return -1;
        }
        throw LOGGER.logExceptionAsError(new IllegalStateException("An unexpected error occurred. No data was read from the stream but the stream did not indicate completion."));
    }

    @Override // java.io.InputStream, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.subscription.cancel();
        if (this.buffer != null) {
            this.buffer.close();
        }
        super.close();
        if (this.lastError != null) {
            throw ((IOException) LOGGER.logThrowableAsError(this.lastError));
        }
    }

    private void blockForData() {
        this.lock.lock();
        try {
            this.waitingForData = true;
            if (this.subscribed) {
                this.subscription.request(1L);
            } else {
                subscribeToData();
            }
            while (this.waitingForData && !this.fluxComplete) {
                try {
                    this.dataAvailable.await();
                } catch (InterruptedException e) {
                    throw LOGGER.logExceptionAsError(new RuntimeException(e));
                }
            }
        } finally {
            this.lock.unlock();
        }
    }

    private void subscribeToData() {
        this.data.filter((v0) -> {
            return v0.hasRemaining();
        }).onBackpressureBuffer().subscribe(byteBuffer -> {
            this.buffer = new ByteArrayInputStream(FluxUtil.byteBufferToArray(byteBuffer));
            this.lock.lock();
            try {
                this.waitingForData = false;
                this.dataAvailable.signal();
            } finally {
                this.lock.unlock();
            }
        }, th -> {
            if (th instanceof IOException) {
                this.lastError = (IOException) th;
            } else {
                this.lastError = new IOException(th);
            }
            signalOnCompleteOrError();
        }, this::signalOnCompleteOrError, subscription -> {
            this.subscription = subscription;
            this.subscribed = true;
            this.subscription.request(1L);
        });
    }

    private void signalOnCompleteOrError() {
        this.fluxComplete = true;
        this.lock.lock();
        try {
            this.waitingForData = false;
            this.dataAvailable.signal();
        } finally {
            this.lock.unlock();
        }
    }

    private void validateParameters(byte[] bArr, int i, int i2) {
        if (bArr == null) {
            throw LOGGER.logExceptionAsError(new NullPointerException("'b' cannot be null"));
        }
        if (i < 0) {
            throw LOGGER.logExceptionAsError(new IndexOutOfBoundsException("'off' cannot be less than 0"));
        }
        if (i2 < 0) {
            throw LOGGER.logExceptionAsError(new IndexOutOfBoundsException("'len' cannot be less than 0"));
        }
        if (i2 > bArr.length - i) {
            throw LOGGER.logExceptionAsError(new IndexOutOfBoundsException("'len' cannot be greater than 'b'.length - 'off'"));
        }
    }
}
