package co.cask.cdap.data.stream.service.upload;

import co.cask.cdap.api.stream.StreamEventData;
import co.cask.cdap.common.NotFoundException;
import co.cask.cdap.common.io.ByteBuffers;
import co.cask.cdap.data.stream.service.ConcurrentStreamWriter;
import co.cask.cdap.data.stream.service.MutableStreamEventData;
import co.cask.cdap.proto.Id;
import com.google.common.base.Throwables;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

/* loaded from: input_file:co/cask/cdap/data/stream/service/upload/BufferedContentWriter.class */
final class BufferedContentWriter implements ContentWriter, Iterable<ByteBuffer> {
    private final Id.Stream streamId;
    private final ConcurrentStreamWriter streamWriter;
    private final Map<String, String> headers;
    private final List<ByteBuffer> bodies = Lists.newLinkedList();

    /* loaded from: input_file:co/cask/cdap/data/stream/service/upload/BufferedContentWriter$StreamEventDataIterator.class */
    private static final class StreamEventDataIterator extends AbstractIterator<StreamEventData> {
        private final Iterator<? extends ByteBuffer> bodies;
        private final MutableStreamEventData streamEventData;

        private StreamEventDataIterator(Map<String, String> map, Iterator<? extends ByteBuffer> it) {
            this.bodies = it;
            this.streamEventData = new MutableStreamEventData().setHeaders(map);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: computeNext, reason: merged with bridge method [inline-methods] */
        public StreamEventData m41computeNext() {
            return !this.bodies.hasNext() ? (StreamEventData) endOfData() : this.streamEventData.setBody(this.bodies.next());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BufferedContentWriter(Id.Stream stream, ConcurrentStreamWriter concurrentStreamWriter, Map<String, String> map) {
        this.streamId = stream;
        this.streamWriter = concurrentStreamWriter;
        this.headers = ImmutableMap.copyOf(map);
    }

    @Override // co.cask.cdap.data.stream.service.upload.ContentWriter
    public void append(ByteBuffer byteBuffer, boolean z) throws IOException {
        if (z) {
            this.bodies.add(byteBuffer);
        } else {
            this.bodies.add(ByteBuffers.copy(byteBuffer));
        }
    }

    @Override // co.cask.cdap.data.stream.service.upload.ContentWriter
    public void appendAll(Iterator<ByteBuffer> it, boolean z) throws IOException {
        while (it.hasNext()) {
            append(it.next(), z);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        try {
            this.streamWriter.enqueue(this.streamId, new StreamEventDataIterator(this.headers, this.bodies.iterator()));
        } catch (NotFoundException e) {
            throw Throwables.propagate(e);
        }
    }

    public void cancel() {
    }

    @Override // java.lang.Iterable
    public Iterator<ByteBuffer> iterator() {
        return this.bodies.iterator();
    }
}
