package com.linkedin.r2.transport.http.client;

import com.linkedin.data.ByteString;
import com.linkedin.r2.message.stream.StreamResponse;
import com.linkedin.r2.message.stream.entitystream.EntityStream;
import com.linkedin.r2.message.stream.entitystream.EntityStreams;
import com.linkedin.r2.message.stream.entitystream.ReadHandle;
import com.linkedin.r2.message.stream.entitystream.Reader;
import com.linkedin.r2.message.stream.entitystream.WriteHandle;
import com.linkedin.r2.message.stream.entitystream.Writer;
import com.linkedin.r2.transport.common.bridge.common.TransportCallback;
import com.linkedin.r2.transport.common.bridge.common.TransportResponse;
import com.linkedin.r2.transport.common.bridge.common.TransportResponseImpl;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/r2-netty-11.0.0.jar:com/linkedin/r2/transport/http/client/StreamExecutionCallback.class */
public class StreamExecutionCallback implements TransportCallback<StreamResponse> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) StreamExecutionCallback.class);
    private final ExecutorService _executor;
    private AtomicReference<TransportCallback<StreamResponse>> _callbackRef;
    private final Queue<Runnable> _taskQueue = new LinkedBlockingQueue();
    private final AtomicInteger _pending = new AtomicInteger(0);
    private final Runnable _eventLoop = new Runnable() { // from class: com.linkedin.r2.transport.http.client.StreamExecutionCallback.1
        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    ((Runnable) StreamExecutionCallback.this._taskQueue.poll()).run();
                    if (StreamExecutionCallback.this._pending.decrementAndGet() > 0) {
                        StreamExecutionCallback.this._executor.execute(StreamExecutionCallback.this._eventLoop);
                    }
                } catch (Throwable th) {
                    StreamExecutionCallback.LOG.error("Unexpected throwable in eventLoop.", th);
                    Thread.currentThread().getUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), th);
                    if (StreamExecutionCallback.this._pending.decrementAndGet() > 0) {
                        StreamExecutionCallback.this._executor.execute(StreamExecutionCallback.this._eventLoop);
                    }
                }
            } catch (Throwable th2) {
                if (StreamExecutionCallback.this._pending.decrementAndGet() > 0) {
                    StreamExecutionCallback.this._executor.execute(StreamExecutionCallback.this._eventLoop);
                }
                throw th2;
            }
        }
    };

    /* loaded from: input_file:WEB-INF/lib/r2-netty-11.0.0.jar:com/linkedin/r2/transport/http/client/StreamExecutionCallback$EventLoopConnector.class */
    private class EventLoopConnector implements Reader, Writer {
        private WriteHandle _wh;
        private ReadHandle _rh;
        private int _outstanding = 0;
        private volatile boolean _aborted = false;
        private final EntityStream _underlying;

        public EventLoopConnector(EntityStream entityStream) {
            this._underlying = entityStream;
        }

        @Override // com.linkedin.r2.message.stream.entitystream.Reader
        public void onInit(ReadHandle readHandle) {
            this._rh = readHandle;
        }

        @Override // com.linkedin.r2.message.stream.entitystream.Writer
        public void onInit(WriteHandle writeHandle) {
            this._wh = writeHandle;
            this._underlying.setReader(this);
        }

        @Override // com.linkedin.r2.message.stream.entitystream.Observer
        public void onDataAvailable(ByteString byteString) {
            if (this._aborted) {
                return;
            }
            StreamExecutionCallback.this.trySchedule(() -> {
                this._outstanding--;
                this._wh.write(byteString);
                int remaining = this._wh.remaining() - this._outstanding;
                if (remaining > 0) {
                    this._rh.request(remaining);
                    this._outstanding += remaining;
                }
            });
        }

        @Override // com.linkedin.r2.message.stream.entitystream.Observer
        public void onDone() {
            StreamExecutionCallback streamExecutionCallback = StreamExecutionCallback.this;
            WriteHandle writeHandle = this._wh;
            writeHandle.getClass();
            streamExecutionCallback.trySchedule(writeHandle::done);
        }

        @Override // com.linkedin.r2.message.stream.entitystream.Observer
        public void onError(Throwable th) {
            StreamExecutionCallback.this.trySchedule(() -> {
                this._wh.error(th);
            });
        }

        @Override // com.linkedin.r2.message.stream.entitystream.Writer
        public void onWritePossible() {
            StreamExecutionCallback.this.trySchedule(() -> {
                this._outstanding = this._wh.remaining();
                this._rh.request(this._outstanding);
            });
        }

        @Override // com.linkedin.r2.message.stream.entitystream.Writer
        public void onAbort(Throwable th) {
            this._aborted = true;
            this._rh.cancel();
        }
    }

    public StreamExecutionCallback(ExecutorService executorService, TransportCallback<StreamResponse> transportCallback) {
        this._executor = executorService;
        this._callbackRef = new AtomicReference<>(transportCallback);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void trySchedule(Runnable runnable) {
        this._taskQueue.add(runnable);
        if (this._pending.incrementAndGet() == 1) {
            this._executor.execute(this._eventLoop);
        }
    }

    @Override // com.linkedin.r2.transport.common.bridge.common.TransportCallback
    public void onResponse(TransportResponse<StreamResponse> transportResponse) {
        TransportResponse<StreamResponse> success;
        TransportCallback<StreamResponse> andSet = this._callbackRef.getAndSet(null);
        if (andSet == null) {
            LOG.warn("Received response {} while _callback is null. Ignored.", transportResponse.getResponse());
            return;
        }
        if (transportResponse.hasError()) {
            success = transportResponse;
        } else {
            success = TransportResponseImpl.success(transportResponse.getResponse().builder().build(EntityStreams.newEntityStream(new EventLoopConnector(transportResponse.getResponse().getEntityStream()))), transportResponse.getWireAttributes());
        }
        TransportResponse<StreamResponse> transportResponse2 = success;
        trySchedule(() -> {
            andSet.onResponse(transportResponse2);
        });
    }
}
