package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageResultSubpartitionView.class */
public class TieredStorageResultSubpartitionView implements ResultSubpartitionView {
    private final BufferAvailabilityListener availabilityListener;
    private final List<NettyPayloadManager> nettyPayloadManagers;
    private final List<NettyServiceProducer> serviceProducers;
    private final List<NettyConnectionId> nettyConnectionIds;
    private volatile boolean isReleased = false;
    private int requiredSegmentId = 0;
    private boolean stopSendingData = false;
    private int managerIndexContainsCurrentSegment = -1;
    private int currentSequenceNumber = -1;

    public TieredStorageResultSubpartitionView(BufferAvailabilityListener bufferAvailabilityListener, List<NettyPayloadManager> list, List<NettyConnectionId> list2, List<NettyServiceProducer> list3) {
        this.availabilityListener = bufferAvailabilityListener;
        this.nettyPayloadManagers = list;
        this.nettyConnectionIds = list2;
        this.serviceProducers = list3;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    @Nullable
    public ResultSubpartition.BufferAndBacklog getNextBuffer() throws IOException {
        if (this.stopSendingData || !findCurrentNettyPayloadQueue()) {
            return null;
        }
        NettyPayloadManager nettyPayloadManager = this.nettyPayloadManagers.get(this.managerIndexContainsCurrentSegment);
        Optional<Buffer> readNettyPayload = readNettyPayload(nettyPayloadManager);
        if (!readNettyPayload.isPresent()) {
            return null;
        }
        this.stopSendingData = readNettyPayload.get().getDataType() == Buffer.DataType.END_OF_SEGMENT;
        if (this.stopSendingData) {
            this.managerIndexContainsCurrentSegment = -1;
        }
        this.currentSequenceNumber++;
        return ResultSubpartition.BufferAndBacklog.fromBufferAndLookahead(readNettyPayload.get(), getDataType(nettyPayloadManager.peek()), getBacklog(), this.currentSequenceNumber);
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public ResultSubpartitionView.AvailabilityWithBacklog getAvailabilityAndBacklog(int i) {
        if (!findCurrentNettyPayloadQueue()) {
            return new ResultSubpartitionView.AvailabilityWithBacklog(false, 0);
        }
        NettyPayloadManager nettyPayloadManager = this.nettyPayloadManagers.get(this.managerIndexContainsCurrentSegment);
        boolean z = i > 0;
        if (i == 0 && isEventOrError(nettyPayloadManager)) {
            z = true;
        }
        return new ResultSubpartitionView.AvailabilityWithBacklog(z, getBacklog());
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public void notifyRequiredSegmentId(int i) {
        if (i > this.requiredSegmentId) {
            this.requiredSegmentId = i;
            this.stopSendingData = false;
            this.availabilityListener.notifyDataAvailable();
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public void releaseAllResources() throws IOException {
        if (this.isReleased) {
            return;
        }
        this.isReleased = true;
        for (int i = 0; i < this.nettyPayloadManagers.size(); i++) {
            releaseQueue(this.nettyPayloadManagers.get(i), this.serviceProducers.get(i), this.nettyConnectionIds.get(i));
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public boolean isReleased() {
        return this.isReleased;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public Throwable getFailureCause() {
        return null;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public int unsynchronizedGetNumberOfQueuedBuffers() {
        if (findCurrentNettyPayloadQueue()) {
            return getBacklog();
        }
        return 0;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public int getNumberOfQueuedBuffers() {
        if (findCurrentNettyPayloadQueue()) {
            return getBacklog();
        }
        return 0;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public void notifyDataAvailable() {
        throw new UnsupportedOperationException("Method notifyDataAvailable should never be called.");
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public void resumeConsumption() {
        throw new UnsupportedOperationException("Method resumeConsumption should never be called.");
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public void acknowledgeAllDataProcessed() {
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public void notifyNewBufferSize(int i) {
        throw new UnsupportedOperationException("Method notifyNewBufferSize should never be called.");
    }

    private Optional<Buffer> readNettyPayload(NettyPayloadManager nettyPayloadManager) throws IOException {
        NettyPayload poll = nettyPayloadManager.poll();
        if (poll == null) {
            return Optional.empty();
        }
        Preconditions.checkState(poll.getSegmentId() == -1);
        Optional<Throwable> error = poll.getError();
        if (!error.isPresent()) {
            return poll.getBuffer();
        }
        releaseAllResources();
        throw new IOException(error.get());
    }

    private int getBacklog() {
        if (this.managerIndexContainsCurrentSegment == -1) {
            return 0;
        }
        return this.nettyPayloadManagers.get(this.managerIndexContainsCurrentSegment).getBacklog();
    }

    private boolean isEventOrError(NettyPayloadManager nettyPayloadManager) {
        NettyPayload peek = nettyPayloadManager.peek();
        return peek != null && (peek.getError().isPresent() || (peek.getBuffer().isPresent() && !peek.getBuffer().get().isBuffer()));
    }

    private Buffer.DataType getDataType(NettyPayload nettyPayload) {
        return (nettyPayload == null || !nettyPayload.getBuffer().isPresent()) ? Buffer.DataType.NONE : nettyPayload.getBuffer().get().getDataType();
    }

    private void releaseQueue(NettyPayloadManager nettyPayloadManager, NettyServiceProducer nettyServiceProducer, NettyConnectionId nettyConnectionId) {
        while (true) {
            NettyPayload poll = nettyPayloadManager.poll();
            if (poll == null) {
                nettyServiceProducer.connectionBroken(nettyConnectionId);
                return;
            }
            poll.getBuffer().ifPresent((v0) -> {
                v0.recycleBuffer();
            });
        }
    }

    private boolean findCurrentNettyPayloadQueue() {
        if (this.managerIndexContainsCurrentSegment != -1 && !this.stopSendingData) {
            return true;
        }
        for (int i = 0; i < this.nettyPayloadManagers.size(); i++) {
            NettyPayload peek = this.nettyPayloadManagers.get(i).peek();
            if (peek != null && peek.getSegmentId() == this.requiredSegmentId) {
                this.managerIndexContainsCurrentSegment = i;
                Preconditions.checkState(this.nettyPayloadManagers.get(this.managerIndexContainsCurrentSegment).poll().getSegmentId() != -1);
                return true;
            }
        }
        return false;
    }
}
