/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.ArrayDeque;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException;
import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
import org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.streaming.runtime.io.BufferSpiller;
import org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class BarrierBuffer
implements CheckpointBarrierHandler {
    private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
    private final InputGate inputGate;
    private final boolean[] blockedChannels;
    private final int totalNumberOfInputChannels;
    private final BufferSpiller bufferSpiller;
    private final ArrayDeque<BufferSpiller.SpilledBufferOrEventSequence> queuedBuffered;
    private final long maxBufferedBytes;
    private BufferSpiller.SpilledBufferOrEventSequence currentBuffered;
    private StatefulTask<?> toNotifyOnCheckpoint;
    private long currentCheckpointId = -1L;
    private int numBarriersReceived;
    private int numClosedChannels;
    private long numQueuedBytes;
    private long startOfAlignmentTimestamp;
    private long latestAlignmentDurationNanos;
    private boolean endOfStream;

    public BarrierBuffer(InputGate inputGate, IOManager ioManager) throws IOException {
        this(inputGate, ioManager, -1L);
    }

    public BarrierBuffer(InputGate inputGate, IOManager ioManager, long maxBufferedBytes) throws IOException {
        Preconditions.checkArgument((maxBufferedBytes == -1L || maxBufferedBytes > 0L ? 1 : 0) != 0);
        this.inputGate = inputGate;
        this.maxBufferedBytes = maxBufferedBytes;
        this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
        this.blockedChannels = new boolean[this.totalNumberOfInputChannels];
        this.bufferSpiller = new BufferSpiller(ioManager, inputGate.getPageSize());
        this.queuedBuffered = new ArrayDeque();
    }

    @Override
    public BufferOrEvent getNextNonBlocked() throws Exception {
        block9: {
            BufferOrEvent next;
            while (true) {
                if (this.currentBuffered == null) {
                    next = this.inputGate.getNextBufferOrEvent();
                } else {
                    next = this.currentBuffered.getNext();
                    if (next == null) {
                        this.completeBufferedSequence();
                        return this.getNextNonBlocked();
                    }
                }
                if (next == null) break block9;
                if (this.isBlocked(next.getChannelIndex())) {
                    this.bufferSpiller.add(next);
                    this.checkSizeLimit();
                    continue;
                }
                if (next.isBuffer()) {
                    return next;
                }
                if (next.getEvent().getClass() == CheckpointBarrier.class) {
                    if (this.endOfStream) continue;
                    this.processBarrier((CheckpointBarrier)next.getEvent(), next.getChannelIndex());
                    continue;
                }
                if (next.getEvent().getClass() != CancelCheckpointMarker.class) break;
                this.processCancellationBarrier((CancelCheckpointMarker)next.getEvent());
            }
            if (next.getEvent().getClass() == EndOfPartitionEvent.class) {
                this.processEndOfPartition();
            }
            return next;
        }
        if (!this.endOfStream) {
            this.endOfStream = true;
            this.releaseBlocksAndResetBarriers();
            return this.getNextNonBlocked();
        }
        return null;
    }

    private void completeBufferedSequence() throws IOException {
        LOG.debug("Finished feeding back buffered data");
        this.currentBuffered.cleanup();
        this.currentBuffered = this.queuedBuffered.pollFirst();
        if (this.currentBuffered != null) {
            this.currentBuffered.open();
            this.numQueuedBytes -= this.currentBuffered.size();
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
        long barrierId = receivedBarrier.getId();
        if (this.totalNumberOfInputChannels == 1) {
            if (barrierId <= this.currentCheckpointId) return;
            this.currentCheckpointId = barrierId;
            this.notifyCheckpoint(receivedBarrier);
            return;
        }
        if (this.numBarriersReceived > 0) {
            if (barrierId == this.currentCheckpointId) {
                this.onBarrier(channelIndex);
            } else {
                if (barrierId <= this.currentCheckpointId) return;
                LOG.warn("Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. Skipping current checkpoint.", (Object)barrierId, (Object)this.currentCheckpointId);
                this.notifyAbort(this.currentCheckpointId, (CheckpointDeclineException)new CheckpointDeclineSubsumedException(barrierId));
                this.releaseBlocksAndResetBarriers();
                this.beginNewAlignment(barrierId, channelIndex);
            }
        } else {
            if (barrierId <= this.currentCheckpointId) return;
            this.beginNewAlignment(barrierId, channelIndex);
        }
        if (this.numBarriersReceived + this.numClosedChannels != this.totalNumberOfInputChannels) return;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Received all barriers, triggering checkpoint {} at {}", (Object)receivedBarrier.getId(), (Object)receivedBarrier.getTimestamp());
        }
        this.releaseBlocksAndResetBarriers();
        this.notifyCheckpoint(receivedBarrier);
    }

    private void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws Exception {
        long barrierId = cancelBarrier.getCheckpointId();
        if (this.totalNumberOfInputChannels == 1) {
            if (barrierId > this.currentCheckpointId) {
                this.currentCheckpointId = barrierId;
                this.notifyAbortOnCancellationBarrier(barrierId);
            }
            return;
        }
        if (this.numBarriersReceived > 0) {
            if (barrierId == this.currentCheckpointId) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Checkpoint {} canceled, aborting alignment", (Object)barrierId);
                }
                this.releaseBlocksAndResetBarriers();
                this.notifyAbortOnCancellationBarrier(barrierId);
            } else if (barrierId > this.currentCheckpointId) {
                LOG.warn("Received cancellation barrier for checkpoint {} before completing current checkpoint {}. Skipping current checkpoint.", (Object)barrierId, (Object)this.currentCheckpointId);
                this.releaseBlocksAndResetBarriers();
                this.currentCheckpointId = barrierId;
                this.startOfAlignmentTimestamp = 0L;
                this.latestAlignmentDurationNanos = 0L;
                this.notifyAbort(this.currentCheckpointId, (CheckpointDeclineException)new CheckpointDeclineSubsumedException(barrierId));
                this.notifyAbortOnCancellationBarrier(barrierId);
            }
        } else if (barrierId > this.currentCheckpointId) {
            this.currentCheckpointId = barrierId;
            this.startOfAlignmentTimestamp = 0L;
            this.latestAlignmentDurationNanos = 0L;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Checkpoint {} canceled, skipping alignment", (Object)barrierId);
            }
            this.notifyAbortOnCancellationBarrier(barrierId);
        }
    }

    private void processEndOfPartition() throws Exception {
        ++this.numClosedChannels;
        if (this.numBarriersReceived > 0) {
            this.notifyAbort(this.currentCheckpointId, (CheckpointDeclineException)new InputEndOfStreamException());
            this.releaseBlocksAndResetBarriers();
        }
    }

    private void notifyCheckpoint(CheckpointBarrier checkpointBarrier) throws Exception {
        if (this.toNotifyOnCheckpoint != null) {
            this.toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointBarrier.getId(), checkpointBarrier.getTimestamp());
        }
    }

    private void notifyAbortOnCancellationBarrier(long checkpointId) throws Exception {
        this.notifyAbort(checkpointId, (CheckpointDeclineException)new CheckpointDeclineOnCancellationBarrierException());
    }

    private void notifyAbort(long checkpointId, CheckpointDeclineException cause) throws Exception {
        if (this.toNotifyOnCheckpoint != null) {
            this.toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId, (Throwable)cause);
        }
    }

    private void checkSizeLimit() throws Exception {
        if (this.maxBufferedBytes > 0L && this.numQueuedBytes + this.bufferSpiller.getBytesWritten() > this.maxBufferedBytes) {
            LOG.info("Checkpoint {} aborted because alignment volume limit ({} bytes) exceeded", (Object)this.currentCheckpointId, (Object)this.maxBufferedBytes);
            this.releaseBlocksAndResetBarriers();
            this.notifyAbort(this.currentCheckpointId, (CheckpointDeclineException)new AlignmentLimitExceededException(this.maxBufferedBytes));
        }
    }

    @Override
    public void registerCheckpointEventHandler(StatefulTask<?> toNotifyOnCheckpoint) {
        if (this.toNotifyOnCheckpoint != null) {
            throw new IllegalStateException("BarrierBuffer already has a registered checkpoint notifyee");
        }
        this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
    }

    @Override
    public boolean isEmpty() {
        return this.currentBuffered == null;
    }

    @Override
    public void cleanup() throws IOException {
        this.bufferSpiller.close();
        if (this.currentBuffered != null) {
            this.currentBuffered.cleanup();
        }
        for (BufferSpiller.SpilledBufferOrEventSequence seq : this.queuedBuffered) {
            seq.cleanup();
        }
        this.queuedBuffered.clear();
        this.numQueuedBytes = 0L;
    }

    private void beginNewAlignment(long checkpointId, int channelIndex) throws IOException {
        this.currentCheckpointId = checkpointId;
        this.onBarrier(channelIndex);
        this.startOfAlignmentTimestamp = System.nanoTime();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Starting stream alignment for checkpoint " + checkpointId + '.');
        }
    }

    private boolean isBlocked(int channelIndex) {
        return this.blockedChannels[channelIndex];
    }

    private void onBarrier(int channelIndex) throws IOException {
        if (!this.blockedChannels[channelIndex]) {
            this.blockedChannels[channelIndex] = true;
            ++this.numBarriersReceived;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Received barrier from channel " + channelIndex);
            }
        } else {
            throw new IOException("Stream corrupt: Repeated barrier for same checkpoint on input " + channelIndex);
        }
    }

    private void releaseBlocksAndResetBarriers() throws IOException {
        LOG.debug("End of stream alignment, feeding buffered data back");
        for (int i = 0; i < this.blockedChannels.length; ++i) {
            this.blockedChannels[i] = false;
        }
        if (this.currentBuffered == null) {
            this.currentBuffered = this.bufferSpiller.rollOver();
            if (this.currentBuffered != null) {
                this.currentBuffered.open();
            }
        } else {
            LOG.debug("Checkpoint skipped via buffered data:Pushing back current alignment buffers and feeding back new alignment data first.");
            BufferSpiller.SpilledBufferOrEventSequence bufferedNow = this.bufferSpiller.rollOverWithNewBuffer();
            if (bufferedNow != null) {
                bufferedNow.open();
                this.queuedBuffered.addFirst(this.currentBuffered);
                this.numQueuedBytes += this.currentBuffered.size();
                this.currentBuffered = bufferedNow;
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Size of buffered data: {} bytes", (Object)(this.currentBuffered == null ? 0L : this.currentBuffered.size()));
        }
        this.numBarriersReceived = 0;
        if (this.startOfAlignmentTimestamp > 0L) {
            this.latestAlignmentDurationNanos = System.nanoTime() - this.startOfAlignmentTimestamp;
            this.startOfAlignmentTimestamp = 0L;
        }
    }

    public long getCurrentCheckpointId() {
        return this.currentCheckpointId;
    }

    @Override
    public long getAlignmentDurationNanos() {
        long start = this.startOfAlignmentTimestamp;
        if (start <= 0L) {
            return this.latestAlignmentDurationNanos;
        }
        return System.nanoTime() - start;
    }

    public String toString() {
        return String.format("last checkpoint: %d, current barriers: %d, closed channels: %d", this.currentCheckpointId, this.numBarriersReceived, this.numClosedChannels);
    }
}

