package org.apache.flink.iteration.datacache.nonkeyed;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.table.runtime.util.MemorySegmentPool;

@Internal
/* loaded from: input_file:org/apache/flink/iteration/datacache/nonkeyed/MemorySegmentWriter.class */
class MemorySegmentWriter<T> implements SegmentWriter<T> {
    private final TypeSerializer<T> serializer;
    private final Path path;
    private final MemorySegmentPool segmentPool;
    private final ManagedMemoryOutputStream outputStream;
    private final DataOutputView outputView;
    private int count = 0;

    /* loaded from: input_file:org/apache/flink/iteration/datacache/nonkeyed/MemorySegmentWriter$ManagedMemoryOutputStream.class */
    private static class ManagedMemoryOutputStream extends OutputStream {
        private final MemorySegmentPool segmentPool;
        private final int pageSize;
        private final List<MemorySegment> segments = new ArrayList();
        private int segmentIndex;
        private int segmentOffset;
        private long globalOffset;
        private long allocatedBytes;

        public ManagedMemoryOutputStream(MemorySegmentPool memorySegmentPool, long j) throws MemoryAllocationException {
            this.segmentPool = memorySegmentPool;
            this.pageSize = memorySegmentPool.pageSize();
            ensureCapacity(Math.max(j, 1L));
        }

        public long getPos() {
            return this.globalOffset;
        }

        public List<MemorySegment> getSegments() {
            return this.segments;
        }

        @Override // java.io.OutputStream
        public void write(int i) throws IOException {
            write(new byte[]{(byte) i}, 0, 1);
        }

        @Override // java.io.OutputStream
        public void write(@Nullable byte[] bArr, int i, int i2) throws IOException {
            try {
                ensureCapacity(this.globalOffset + i2);
                while (i2 > 0) {
                    int min = Math.min(i2, this.pageSize - this.segmentOffset);
                    this.segments.get(this.segmentIndex).put(this.segmentOffset, bArr, i, min);
                    this.segmentOffset += min;
                    this.globalOffset += min;
                    if (this.segmentOffset >= this.pageSize) {
                        this.segmentIndex++;
                        this.segmentOffset = 0;
                    }
                    i += min;
                    i2 -= min;
                }
            } catch (MemoryAllocationException e) {
                throw new RuntimeException((Throwable) e);
            }
        }

        private void ensureCapacity(long j) throws MemoryAllocationException {
            if (this.allocatedBytes >= j) {
                return;
            }
            int size = ((int) (j % ((long) this.pageSize) == 0 ? j / this.pageSize : (j / this.pageSize) + 1)) - this.segments.size();
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < size; i++) {
                MemorySegment nextSegment = this.segmentPool.nextSegment();
                if (nextSegment == null) {
                    this.segmentPool.returnAll(arrayList);
                    throw new MemoryAllocationException();
                }
                arrayList.add(nextSegment);
            }
            this.segments.addAll(arrayList);
            this.allocatedBytes += arrayList.size() * this.pageSize;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MemorySegmentWriter(TypeSerializer<T> typeSerializer, Path path, MemorySegmentPool memorySegmentPool, long j) throws MemoryAllocationException {
        this.serializer = typeSerializer;
        this.path = path;
        this.segmentPool = memorySegmentPool;
        this.outputStream = new ManagedMemoryOutputStream(memorySegmentPool, j);
        this.outputView = new DataOutputViewStreamWrapper(this.outputStream);
    }

    @Override // org.apache.flink.iteration.datacache.nonkeyed.SegmentWriter
    public boolean addRecord(T t) throws IOException {
        if (this.outputStream.getPos() >= 1073741824) {
            return false;
        }
        try {
            this.serializer.serialize(t, this.outputView);
            this.count++;
            return true;
        } catch (RuntimeException e) {
            if (e.getCause() instanceof MemoryAllocationException) {
                return false;
            }
            throw e;
        }
    }

    @Override // org.apache.flink.iteration.datacache.nonkeyed.SegmentWriter
    public Optional<Segment> finish() throws IOException {
        if (this.count > 0) {
            return Optional.of(new Segment(this.path, this.count, this.outputStream.getSegments()));
        }
        this.segmentPool.returnAll(this.outputStream.getSegments());
        return Optional.empty();
    }
}
