package com.linkedin.r2.netty.entitystream;

import com.linkedin.data.ByteString;
import com.linkedin.r2.RemoteInvocationException;
import com.linkedin.r2.message.stream.entitystream.WriteHandle;
import com.linkedin.r2.message.stream.entitystream.Writer;
import com.linkedin.r2.netty.common.ChannelPipelineEvent;
import com.linkedin.r2.netty.common.NettyChannelAttributes;
import com.linkedin.r2.netty.common.StreamingTimeout;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.TooLongFrameException;
import java.util.LinkedList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/r2/netty/entitystream/StreamWriter.class */
public class StreamWriter extends ChannelInboundHandlerAdapter implements Writer {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) StreamWriter.class);
    public static final ByteString EOF = ByteString.copy(new byte[0]);
    private static final int BUFFER_HIGH_WATER_MARK = 24576;
    private static final int BUFFER_LOW_WATER_MARK = 8192;
    private final ChannelHandlerContext _ctx;
    private final long _maxContentLength;
    private volatile WriteHandle _wh;
    private volatile Throwable _failureBeforeInit;
    private final List<ByteString> _buffer = new LinkedList();
    private long _totalBytesWritten = 0;
    private int _bufferedBytes = 0;
    private boolean _errorRaised = false;

    public StreamWriter(ChannelHandlerContext channelHandlerContext, long j) {
        this._ctx = channelHandlerContext;
        this._maxContentLength = j;
    }

    public void onDataAvailable(ByteString byteString) {
        if (byteString.length() + this._totalBytesWritten > this._maxContentLength) {
            onError(new TooLongFrameException("HTTP content length exceeded " + this._maxContentLength + " bytes."));
            return;
        }
        this._totalBytesWritten += byteString.length();
        this._buffer.add(byteString);
        this._bufferedBytes += byteString.length();
        if (this._bufferedBytes > 24576 && this._ctx.channel().config().isAutoRead()) {
            this._ctx.channel().config().setAutoRead(false);
        }
        if (this._wh != null) {
            doWrite();
        }
    }

    public void onError(Throwable th) {
        if (this._wh == null) {
            this._failureBeforeInit = th;
        } else {
            if (this._errorRaised) {
                return;
            }
            this._wh.error(new RemoteInvocationException(th));
            this._errorRaised = true;
        }
    }

    @Override // com.linkedin.r2.message.stream.entitystream.Writer
    public void onInit(WriteHandle writeHandle) {
        this._wh = writeHandle;
        refreshStreamLastActiveTime();
    }

    @Override // com.linkedin.r2.message.stream.entitystream.Writer
    public void onWritePossible() {
        if (this._failureBeforeInit != null) {
            onError(this._failureBeforeInit);
        } else if (this._ctx.executor().inEventLoop()) {
            doWrite();
        } else {
            this._ctx.executor().execute(this::doWrite);
        }
    }

    @Override // com.linkedin.r2.message.stream.entitystream.Writer
    public void onAbort(Throwable th) {
        LOG.error("onAbort: " + th.toString());
        this._ctx.fireExceptionCaught(th);
    }

    private void doWrite() {
        refreshStreamLastActiveTime();
        while (this._wh.remaining() > 0 && !this._buffer.isEmpty()) {
            ByteString remove = this._buffer.remove(0);
            if (remove == EOF) {
                this._wh.done();
                this._ctx.fireUserEventTriggered((Object) ChannelPipelineEvent.RESPONSE_COMPLETE);
                return;
            }
            this._wh.write(remove);
            this._bufferedBytes -= remove.length();
            if (!this._ctx.channel().config().isAutoRead() && this._bufferedBytes < 8192) {
                this._ctx.channel().config().setAutoRead(true);
            }
            refreshStreamLastActiveTime();
        }
    }

    private void refreshStreamLastActiveTime() {
        StreamingTimeout streamingTimeout = (StreamingTimeout) this._ctx.channel().attr(NettyChannelAttributes.STREAMING_TIMEOUT_FUTURE).get();
        if (streamingTimeout != null) {
            streamingTimeout.refreshLastActiveTime();
        }
    }
}
