package com.linkedin.r2.netty.entitystream;

import com.linkedin.data.ByteString;
import com.linkedin.r2.message.stream.entitystream.ReadHandle;
import com.linkedin.r2.message.stream.entitystream.Reader;
import com.linkedin.r2.netty.common.NettyChannelAttributes;
import com.linkedin.r2.netty.common.StreamingTimeout;
import io.netty.channel.ChannelHandlerContext;

/* loaded from: input_file:com/linkedin/r2/netty/entitystream/StreamReader.class */
public class StreamReader implements Reader {
    public static final ByteString EOF = ByteString.copy(new byte[0]);
    private static final int REQUEST_CHUNKS = 1;
    private static final int MAX_BUFFERED_CHUNKS = 8;
    private static final int FLUSH_THRESHOLD = 8192;
    private final ChannelHandlerContext _ctx;
    private int _notFlushedBytes;
    private int _notFlushedChunks;
    private volatile ReadHandle _rh;

    public StreamReader(ChannelHandlerContext channelHandlerContext) {
        this._ctx = channelHandlerContext;
    }

    @Override // com.linkedin.r2.message.stream.entitystream.Reader
    public void onInit(ReadHandle readHandle) {
        this._rh = readHandle;
        refreshStreamLastActiveTime();
        this._rh.request(8);
    }

    @Override // com.linkedin.r2.message.stream.entitystream.Observer
    public void onDataAvailable(ByteString byteString) {
        refreshStreamLastActiveTime();
        this._ctx.write(byteString).addListener2(future -> {
            this._rh.request(1);
        });
        this._notFlushedBytes += byteString.length();
        this._notFlushedChunks++;
        if (this._notFlushedBytes >= 8192 || this._notFlushedChunks == 8) {
            this._ctx.flush();
            this._notFlushedBytes = 0;
            this._notFlushedChunks = 0;
        }
    }

    @Override // com.linkedin.r2.message.stream.entitystream.Observer
    public void onDone() {
        this._ctx.writeAndFlush(EOF);
    }

    @Override // com.linkedin.r2.message.stream.entitystream.Observer
    public void onError(Throwable th) {
        this._ctx.fireExceptionCaught(th);
    }

    private void refreshStreamLastActiveTime() {
        StreamingTimeout streamingTimeout = (StreamingTimeout) this._ctx.channel().attr(NettyChannelAttributes.STREAMING_TIMEOUT_FUTURE).get();
        if (streamingTimeout != null) {
            streamingTimeout.refreshLastActiveTime();
        }
    }
}
