package org.apache.flink.streaming.runtime.io;

import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.streaming.runtime.tasks.StreamingSuperstep;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/runtime/io/BarrierBuffer.class */
public class BarrierBuffer {
    private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
    private int totalNumberOfInputChannels;
    private StreamingSuperstep currentSuperstep;
    private boolean superstepStarted;
    private AbstractReader reader;
    private InputGate inputGate;
    private SpillReader spillReader;
    private BufferSpiller bufferSpiller;
    private Queue<SpillingBufferOrEvent> nonprocessed = new LinkedList();
    private Queue<SpillingBufferOrEvent> blockedNonprocessed = new LinkedList();
    private Set<Integer> blockedChannels = new HashSet();
    private boolean inputFinished = false;
    private BufferOrEvent endOfStreamEvent = null;

    public BarrierBuffer(InputGate inputGate, AbstractReader abstractReader) {
        this.inputGate = inputGate;
        this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
        this.reader = abstractReader;
        try {
            this.bufferSpiller = new BufferSpiller();
            this.spillReader = new SpillReader();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    protected void startSuperstep(StreamingSuperstep streamingSuperstep) {
        this.currentSuperstep = streamingSuperstep;
        this.superstepStarted = true;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Superstep started with id: " + streamingSuperstep.getId());
        }
    }

    protected BufferOrEvent getNonProcessed() throws IOException {
        while (true) {
            SpillingBufferOrEvent poll = this.nonprocessed.poll();
            if (poll == null) {
                return null;
            }
            BufferOrEvent bufferOrEvent = poll.getBufferOrEvent();
            if (!isBlocked(bufferOrEvent.getChannelIndex())) {
                return bufferOrEvent;
            }
            this.blockedNonprocessed.add(new SpillingBufferOrEvent(bufferOrEvent, this.bufferSpiller, this.spillReader));
        }
    }

    protected boolean isBlocked(int i) {
        return this.blockedChannels.contains(Integer.valueOf(i));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isAllBlocked() {
        return this.blockedChannels.size() == this.totalNumberOfInputChannels;
    }

    public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
        BufferOrEvent nonProcessed = getNonProcessed();
        if (nonProcessed != null) {
            return nonProcessed;
        }
        if (this.blockedNonprocessed.isEmpty() && this.inputFinished) {
            return this.endOfStreamEvent;
        }
        while (!this.inputFinished) {
            BufferOrEvent nextBufferOrEvent = this.inputGate.getNextBufferOrEvent();
            if (nextBufferOrEvent.isBuffer() || !(nextBufferOrEvent.getEvent() instanceof EndOfPartitionEvent)) {
                if (!isBlocked(nextBufferOrEvent.getChannelIndex())) {
                    return nextBufferOrEvent;
                }
                this.blockedNonprocessed.add(new SpillingBufferOrEvent(nextBufferOrEvent, this.bufferSpiller, this.spillReader));
            } else if (this.inputGate.isFinished()) {
                this.endOfStreamEvent = nextBufferOrEvent;
                this.inputFinished = true;
            }
        }
        actOnAllBlocked();
        return getNextNonBlocked();
    }

    protected void blockChannel(int i) {
        if (this.blockedChannels.contains(Integer.valueOf(i))) {
            throw new RuntimeException("Tried to block an already blocked channel");
        }
        this.blockedChannels.add(Integer.valueOf(i));
        if (LOG.isDebugEnabled()) {
            LOG.debug("Channel blocked with index: " + i);
        }
        if (isAllBlocked()) {
            actOnAllBlocked();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void releaseBlocks() {
        if (!this.nonprocessed.isEmpty()) {
            throw new RuntimeException("Error in barrier buffer logic");
        }
        this.nonprocessed = this.blockedNonprocessed;
        this.blockedNonprocessed = new LinkedList();
        try {
            this.spillReader.setSpillFile(this.bufferSpiller.getSpillFile());
            this.bufferSpiller.resetSpillFile();
            this.blockedChannels.clear();
            this.superstepStarted = false;
            if (LOG.isDebugEnabled()) {
                LOG.debug("All barriers received, blocks released");
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void actOnAllBlocked() {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Publishing barrier to the vertex");
        }
        if (this.currentSuperstep != null && !this.inputFinished) {
            this.reader.publish(this.currentSuperstep);
        }
        releaseBlocks();
    }

    public void processSuperstep(BufferOrEvent bufferOrEvent) {
        StreamingSuperstep streamingSuperstep = (StreamingSuperstep) bufferOrEvent.getEvent();
        if (!this.superstepStarted) {
            startSuperstep(streamingSuperstep);
        }
        blockChannel(bufferOrEvent.getChannelIndex());
    }

    public void cleanup() throws IOException {
        this.bufferSpiller.close();
        File spillFile = this.bufferSpiller.getSpillFile();
        if (spillFile != null) {
            spillFile.delete();
        }
        this.spillReader.close();
        File spillFile2 = this.spillReader.getSpillFile();
        if (spillFile2 != null) {
            spillFile2.delete();
        }
    }

    public String toString() {
        return this.nonprocessed.toString() + this.blockedNonprocessed.toString();
    }

    public boolean isEmpty() {
        return this.nonprocessed.isEmpty() && this.blockedNonprocessed.isEmpty();
    }
}
