package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/SpillableSubpartition.class */
public class SpillableSubpartition extends ResultSubpartition {
    private static final Logger LOG = LoggerFactory.getLogger(SpillableSubpartition.class);
    final ArrayList<Buffer> buffers;
    final IOManager ioManager;
    final IOManager.IOMode ioMode;
    BufferFileWriter spillWriter;
    private boolean isFinished;
    private boolean isReleased;
    private ResultSubpartitionView readView;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SpillableSubpartition(int i, ResultPartition resultPartition, IOManager iOManager, IOManager.IOMode iOMode) {
        super(i, resultPartition);
        this.buffers = new ArrayList<>();
        this.ioManager = (IOManager) Preconditions.checkNotNull(iOManager);
        this.ioMode = (IOManager.IOMode) Preconditions.checkNotNull(iOMode);
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartition
    public boolean add(Buffer buffer) throws IOException {
        Preconditions.checkNotNull(buffer);
        synchronized (this.buffers) {
            if (this.isFinished || this.isReleased) {
                return false;
            }
            if (this.spillWriter == null) {
                this.buffers.add(buffer);
                return true;
            }
            this.spillWriter.writeBlock(buffer);
            return true;
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartition
    public void finish() throws IOException {
        synchronized (this.buffers) {
            if (add(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE))) {
                if (this.spillWriter != null) {
                    this.spillWriter.close();
                }
                this.isFinished = true;
            }
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartition
    public void release() throws IOException {
        synchronized (this.buffers) {
            if (this.isReleased) {
                return;
            }
            Iterator<Buffer> it = this.buffers.iterator();
            while (it.hasNext()) {
                it.next().recycle();
            }
            this.buffers.clear();
            this.buffers.trimToSize();
            if (this.spillWriter != null) {
                this.spillWriter.closeAndDelete();
            }
            ResultSubpartitionView resultSubpartitionView = this.readView;
            this.readView = null;
            this.isReleased = true;
            if (resultSubpartitionView != null) {
                resultSubpartitionView.notifySubpartitionConsumed();
            }
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartition
    public int releaseMemory() throws IOException {
        synchronized (this.buffers) {
            if (this.spillWriter != null) {
                return 0;
            }
            this.spillWriter = this.ioManager.createBufferFileWriter(this.ioManager.createChannel());
            int size = this.buffers.size();
            for (int i = 0; i < size; i++) {
                this.spillWriter.writeBlock(this.buffers.remove(0));
            }
            LOG.debug("Spilling {} buffers of {}.", Integer.valueOf(size), this);
            return size;
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartition
    public ResultSubpartitionView createReadView(BufferProvider bufferProvider) throws IOException {
        ResultSubpartitionView resultSubpartitionView;
        synchronized (this.buffers) {
            if (!this.isFinished) {
                throw new IllegalStateException("Subpartition has not been finished yet, but blocking subpartitions can only be consumed after they have been finished.");
            }
            if (this.readView != null) {
                throw new IllegalStateException("Subpartition is being or already has been consumed, but we currently allow subpartitions to only be consumed once.");
            }
            if (!(this.spillWriter != null && (this.spillWriter.isClosed() || this.spillWriter.getNumberOfOutstandingRequests() == 0))) {
                this.readView = new SpillableSubpartitionView(this, bufferProvider, this.buffers.size(), this.ioMode);
            } else if (this.ioMode.isSynchronous()) {
                this.readView = new SpilledSubpartitionViewSyncIO(this, bufferProvider.getMemorySegmentSize(), this.spillWriter.getChannelID(), 0L);
            } else {
                this.readView = new SpilledSubpartitionViewAsyncIO(this, bufferProvider, this.ioManager, this.spillWriter.getChannelID(), 0L);
            }
            resultSubpartitionView = this.readView;
        }
        return resultSubpartitionView;
    }

    public String toString() {
        Object[] objArr = new Object[5];
        objArr[0] = Integer.valueOf(getTotalNumberOfBuffers());
        objArr[1] = Long.valueOf(getTotalNumberOfBytes());
        objArr[2] = Boolean.valueOf(this.isFinished);
        objArr[3] = Boolean.valueOf(this.readView != null);
        objArr[4] = Boolean.valueOf(this.spillWriter != null);
        return String.format("SpillableSubpartition [%d number of buffers (%d bytes),finished? %s, read view? %s, spilled? %s]", objArr);
    }
}
