package org.apache.druid.frame.channel;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.io.InputStream;
import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.io.IOUtils;
import org.apache.commons.math3.optimization.direct.CMAESOptimizer;
import org.apache.druid.frame.Frame;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;

/* loaded from: input_file:org/apache/druid/frame/channel/ReadableInputStreamFrameChannel.class */
public class ReadableInputStreamFrameChannel implements ReadableFrameChannel {
    private final InputStream inputStream;
    private final ReadableByteChunksFrameChannel delegate;
    private final Object lock = new Object();

    @GuardedBy("lock")
    private final byte[] buffer = new byte[8192];

    @GuardedBy("lock")
    private long totalInputStreamBytesRead = 0;

    @GuardedBy("lock")
    private boolean inputStreamFinished = false;

    @GuardedBy("lock")
    private boolean inputStreamError = false;
    private boolean isStarted = false;
    private volatile boolean keepReading = true;
    private final Object readMonitor = new Object();
    private final ExecutorService executorService;
    private static final int BASE_SLEEP_MILLIS = 100;
    private static final int MAX_SLEEP_MILLIS = 2000;

    private ReadableInputStreamFrameChannel(InputStream inputStream, ReadableByteChunksFrameChannel readableByteChunksFrameChannel, ExecutorService executorService) {
        this.inputStream = inputStream;
        this.delegate = readableByteChunksFrameChannel;
        this.executorService = executorService;
    }

    public static ReadableInputStreamFrameChannel open(InputStream inputStream, String str, ExecutorService executorService, boolean z) {
        return new ReadableInputStreamFrameChannel(inputStream, ReadableByteChunksFrameChannel.create(str, z), executorService);
    }

    @Override // org.apache.druid.frame.channel.ReadableFrameChannel
    public boolean isFinished() {
        boolean isFinished;
        synchronized (this.lock) {
            startReading();
            isFinished = this.delegate.isFinished();
        }
        return isFinished;
    }

    @Override // org.apache.druid.frame.channel.ReadableFrameChannel
    public boolean canRead() {
        boolean canRead;
        synchronized (this.lock) {
            startReading();
            canRead = this.delegate.canRead();
        }
        return canRead;
    }

    @Override // org.apache.druid.frame.channel.ReadableFrameChannel
    public Frame read() {
        Frame read;
        synchronized (this.lock) {
            startReading();
            read = this.delegate.read();
        }
        return read;
    }

    @Override // org.apache.druid.frame.channel.ReadableFrameChannel
    public ListenableFuture<?> readabilityFuture() {
        ListenableFuture<?> readabilityFuture;
        synchronized (this.lock) {
            startReading();
            readabilityFuture = this.delegate.readabilityFuture();
        }
        return readabilityFuture;
    }

    @Override // org.apache.druid.frame.channel.ReadableFrameChannel, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        synchronized (this.lock) {
            this.inputStreamFinished = true;
            this.delegate.close();
            IOUtils.closeQuietly(this.inputStream);
        }
    }

    private void startReading() {
        if (this.isStarted) {
            return;
        }
        this.isStarted = true;
        this.executorService.submit(() -> {
            int i = 1;
            while (true) {
                if (this.keepReading) {
                    synchronized (this.lock) {
                        i = 1;
                        if (this.inputStreamFinished) {
                            this.delegate.doneWriting();
                            return;
                        }
                        try {
                            int read = this.inputStream.read(this.buffer);
                            if (read == -1) {
                                this.inputStreamFinished = true;
                                this.delegate.doneWriting();
                                IOUtils.closeQuietly(this.inputStream);
                                return;
                            } else {
                                ListenableFuture<?> addChunk = this.delegate.addChunk(Arrays.copyOfRange(this.buffer, 0, read));
                                this.totalInputStreamBytesRead += read;
                                if (addChunk != null) {
                                    this.keepReading = false;
                                    addChunk.addListener(() -> {
                                        synchronized (this.readMonitor) {
                                            this.keepReading = true;
                                            this.readMonitor.notify();
                                        }
                                    }, Execs.directExecutor());
                                } else {
                                    this.keepReading = true;
                                }
                            }
                        } catch (Exception e) {
                            this.delegate.setError(new ISE(e, "Found error while reading input stream at %d", Long.valueOf(this.totalInputStreamBytesRead)));
                            this.inputStreamError = true;
                            IOUtils.closeQuietly(this.inputStream);
                            return;
                        }
                    }
                } else {
                    try {
                        synchronized (this.readMonitor) {
                            if (!this.keepReading) {
                                this.readMonitor.wait(nextRetrySleepMillis(i));
                            }
                        }
                        synchronized (this.lock) {
                            if (this.inputStreamFinished || this.inputStreamError || this.delegate.isErrorOrFinished()) {
                                break;
                            }
                        }
                        i++;
                    } catch (InterruptedException e2) {
                        IOUtils.closeQuietly(this.inputStream);
                        throw new ISE(e2, Thread.currentThread().getName() + "interrupted", new Object[0]);
                    }
                }
            }
        });
    }

    private static long nextRetrySleepMillis(int i) {
        return (long) (Math.min(2000.0d, 100.0d * Math.pow(2.0d, i - 1)) * Math.min(Math.max(1.0d + (0.2d * ThreadLocalRandom.current().nextGaussian()), CMAESOptimizer.DEFAULT_STOPFITNESS), 2.0d));
    }
}
