package co.cask.cdap.data.stream.service.upload;

import co.cask.cdap.proto.id.StreamId;
import co.cask.http.BodyConsumer;
import co.cask.http.HttpResponder;
import com.google.common.base.Throwables;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.nio.ByteBuffer;
import javax.annotation.concurrent.NotThreadSafe;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferIndexFinder;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:co/cask/cdap/data/stream/service/upload/TextStreamBodyConsumer.class */
final class TextStreamBodyConsumer extends BodyConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(TextStreamBodyConsumer.class);
    private final StreamId streamId;
    private final ContentWriterFactory contentWriterFactory;
    private ContentWriter contentWriter;
    private boolean failed;
    private ChannelBuffer buffer = ChannelBuffers.EMPTY_BUFFER;

    /* JADX INFO: Access modifiers changed from: package-private */
    public TextStreamBodyConsumer(ContentWriterFactory contentWriterFactory) {
        this.streamId = contentWriterFactory.getStream();
        this.contentWriterFactory = contentWriterFactory;
    }

    public void chunk(ChannelBuffer channelBuffer, HttpResponder httpResponder) {
        if (this.failed) {
            return;
        }
        ChannelBuffer channelBuffer2 = channelBuffer;
        if (this.buffer.readable()) {
            channelBuffer2 = ChannelBuffers.wrappedBuffer(new ChannelBuffer[]{this.buffer, channelBuffer2});
            this.buffer = ChannelBuffers.EMPTY_BUFFER;
        }
        try {
            processChunk(channelBuffer2);
            if (channelBuffer2.readable()) {
                this.buffer = channelBuffer2;
            }
        } catch (Exception e) {
            this.failed = true;
            LOG.error("Failed to write upload content to stream {}", this.streamId, e);
            httpResponder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, "Failed to write uploaded content");
            throw Throwables.propagate(e);
        }
    }

    public void finished(HttpResponder httpResponder) {
        try {
            ContentWriter contentWriter = getContentWriter();
            if (this.buffer.readable()) {
                processChunk(this.buffer);
                if (this.buffer.readable()) {
                    contentWriter.append(this.buffer.toByteBuffer(), true);
                }
            }
            contentWriter.close();
            httpResponder.sendStatus(HttpResponseStatus.OK);
        } catch (Exception e) {
            LOG.error("Failed to write upload content to stream {}", this.streamId, e);
            httpResponder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, "Failed to write uploaded content");
        }
    }

    public void handleError(Throwable th) {
        LOG.warn("Failed to handle upload to stream {}", this.streamId, th);
    }

    private void processChunk(final ChannelBuffer channelBuffer) throws IOException {
        getContentWriter().appendAll(new AbstractIterator<ByteBuffer>() { // from class: co.cask.cdap.data.stream.service.upload.TextStreamBodyConsumer.1
            /* JADX INFO: Access modifiers changed from: protected */
            /* renamed from: computeNext, reason: merged with bridge method [inline-methods] */
            public ByteBuffer m46computeNext() {
                int bytesBefore = channelBuffer.bytesBefore(ChannelBufferIndexFinder.CRLF);
                if (bytesBefore <= 0) {
                    return (ByteBuffer) endOfData();
                }
                ChannelBuffer readSlice = channelBuffer.readSlice(bytesBefore);
                while (channelBuffer.bytesBefore(ChannelBufferIndexFinder.CRLF) == 0) {
                    channelBuffer.readByte();
                }
                return readSlice.toByteBuffer();
            }
        }, true);
    }

    private ContentWriter getContentWriter() throws IOException {
        if (this.contentWriter == null) {
            this.contentWriter = this.contentWriterFactory.create(ImmutableMap.of());
        }
        return this.contentWriter;
    }
}
