package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
@NotThreadSafe
/* loaded from: input_file:org/apache/flink/streaming/runtime/io/SingleCheckpointBarrierHandler.class */
public class SingleCheckpointBarrierHandler extends CheckpointBarrierHandler {
    private static final Logger LOG = LoggerFactory.getLogger(SingleCheckpointBarrierHandler.class);
    private final String taskName;
    private final CheckpointBarrierBehaviourController controller;
    private int numBarriersReceived;
    private long currentCheckpointId;
    private int numOpenChannels;
    private CompletableFuture<Void> allBarriersReceivedFuture;

    @VisibleForTesting
    static SingleCheckpointBarrierHandler createUnalignedCheckpointBarrierHandler(SubtaskCheckpointCoordinator subtaskCheckpointCoordinator, String str, AbstractInvokable abstractInvokable, CheckpointableInput... checkpointableInputArr) {
        return new SingleCheckpointBarrierHandler(str, abstractInvokable, (int) Arrays.stream(checkpointableInputArr).flatMap(checkpointableInput -> {
            return checkpointableInput.getChannelInfos().stream();
        }).count(), new UnalignedController(subtaskCheckpointCoordinator, checkpointableInputArr));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SingleCheckpointBarrierHandler(String str, AbstractInvokable abstractInvokable, int i, CheckpointBarrierBehaviourController checkpointBarrierBehaviourController) {
        super(abstractInvokable);
        this.currentCheckpointId = -1L;
        this.allBarriersReceivedFuture = FutureUtils.completedVoidFuture();
        this.taskName = str;
        this.numOpenChannels = i;
        this.controller = checkpointBarrierBehaviourController;
    }

    @Override // org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler
    public void processBarrier(CheckpointBarrier checkpointBarrier, InputChannelInfo inputChannelInfo) throws IOException {
        long id = checkpointBarrier.getId();
        LOG.debug("{}: Received barrier from channel {} @ {}.", new Object[]{this.taskName, inputChannelInfo, Long.valueOf(id)});
        if (this.currentCheckpointId > id || (this.currentCheckpointId == id && !isCheckpointPending())) {
            this.controller.obsoleteBarrierReceived(inputChannelInfo, checkpointBarrier);
            return;
        }
        if (this.currentCheckpointId < id) {
            if (isCheckpointPending()) {
                cancelSubsumedCheckpoint(id);
            }
            if (getNumOpenChannels() == 1) {
                markAlignmentStartAndEnd(id, checkpointBarrier.getTimestamp());
            } else {
                markAlignmentStart(id, checkpointBarrier.getTimestamp());
            }
            this.currentCheckpointId = id;
            this.numBarriersReceived = 0;
            this.allBarriersReceivedFuture = new CompletableFuture<>();
            try {
                if (this.controller.preProcessFirstBarrier(inputChannelInfo, checkpointBarrier)) {
                    LOG.debug("{}: Triggering checkpoint {} on the first barrier at {}.", new Object[]{this.taskName, Long.valueOf(checkpointBarrier.getId()), Long.valueOf(checkpointBarrier.getTimestamp())});
                    notifyCheckpoint(checkpointBarrier);
                }
            } catch (CheckpointException e) {
                abortInternal(checkpointBarrier.getId(), e);
                return;
            }
        }
        this.controller.barrierReceived(inputChannelInfo, checkpointBarrier);
        if (this.currentCheckpointId == id) {
            int i = this.numBarriersReceived + 1;
            this.numBarriersReceived = i;
            if (i == this.numOpenChannels) {
                if (getNumOpenChannels() > 1) {
                    markAlignmentEnd();
                }
                this.numBarriersReceived = 0;
                if (this.controller.postProcessLastBarrier(inputChannelInfo, checkpointBarrier)) {
                    LOG.debug("{}: Triggering checkpoint {} on the last barrier at {}.", new Object[]{this.taskName, Long.valueOf(checkpointBarrier.getId()), Long.valueOf(checkpointBarrier.getTimestamp())});
                    notifyCheckpoint(checkpointBarrier);
                }
                this.allBarriersReceivedFuture.complete(null);
            }
        }
    }

    @Override // org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler
    public void processBarrierAnnouncement(CheckpointBarrier checkpointBarrier, int i, InputChannelInfo inputChannelInfo) throws IOException {
    }

    @Override // org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler
    public void processCancellationBarrier(CancelCheckpointMarker cancelCheckpointMarker) throws IOException {
        long checkpointId = cancelCheckpointMarker.getCheckpointId();
        if (checkpointId > this.currentCheckpointId || (checkpointId == this.currentCheckpointId && this.numBarriersReceived > 0)) {
            LOG.debug("{}: Received cancellation {}.", this.taskName, Long.valueOf(checkpointId));
            abortInternal(checkpointId, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER));
        }
    }

    private void abortInternal(long j, CheckpointException checkpointException) throws IOException {
        this.currentCheckpointId = Math.max(j, this.currentCheckpointId);
        this.numBarriersReceived = 0;
        this.controller.abortPendingCheckpoint(j, checkpointException);
        if (j == this.currentCheckpointId) {
            resetAlignment();
        }
        notifyAbort(j, checkpointException);
        this.allBarriersReceivedFuture.completeExceptionally(checkpointException);
    }

    @Override // org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler
    public void processEndOfPartition() throws IOException {
        this.numOpenChannels--;
        if (isCheckpointPending()) {
            LOG.warn("{}: Received EndOfPartition(-1) before completing current checkpoint {}. Skipping current checkpoint.", this.taskName, Long.valueOf(this.currentCheckpointId));
            this.numBarriersReceived = 0;
            CheckpointException checkpointException = new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM);
            this.controller.abortPendingCheckpoint(this.currentCheckpointId, checkpointException);
            this.allBarriersReceivedFuture.completeExceptionally(checkpointException);
            notifyAbort(this.currentCheckpointId, checkpointException);
        }
    }

    @Override // org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler
    public long getLatestCheckpointId() {
        return this.currentCheckpointId;
    }

    @Override // org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.allBarriersReceivedFuture.cancel(false);
        super.close();
    }

    @Override // org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler
    protected boolean isCheckpointPending() {
        return this.numBarriersReceived > 0;
    }

    private void cancelSubsumedCheckpoint(long j) throws IOException {
        CheckpointException checkpointException = new CheckpointException("Barrier id: " + j, CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED);
        LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. Skipping current checkpoint.", new Object[]{this.taskName, Long.valueOf(j), Long.valueOf(this.currentCheckpointId)});
        this.controller.abortPendingCheckpoint(this.currentCheckpointId, checkpointException);
        this.allBarriersReceivedFuture.completeExceptionally(checkpointException);
        notifyAbort(this.currentCheckpointId, checkpointException);
    }

    @Override // org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler
    public CompletableFuture<Void> getAllBarriersReceivedFuture(long j) {
        if (j < this.currentCheckpointId) {
            return FutureUtils.completedVoidFuture();
        }
        if (j > this.currentCheckpointId) {
            throw new IllegalStateException("Checkpoint " + j + " has not been started at all");
        }
        return this.allBarriersReceivedFuture;
    }

    @VisibleForTesting
    int getNumOpenChannels() {
        return this.numOpenChannels;
    }

    public String toString() {
        return String.format("%s: current checkpoint: %d, current barriers: %d, open channels: %d", this.taskName, Long.valueOf(this.currentCheckpointId), Integer.valueOf(this.numBarriersReceived), Integer.valueOf(this.numOpenChannels));
    }
}
