package org.apache.flink.streaming.runtime.io;

import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.concurrent.NotThreadSafe;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferReceivedListener;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
@NotThreadSafe
/* loaded from: input_file:org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.class */
public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler {
    private static final Logger LOG = LoggerFactory.getLogger(CheckpointBarrierUnaligner.class);
    private final String taskName;
    private final Map<InputChannelInfo, Boolean> hasInflightBuffers;
    private int numBarrierConsumed;
    private long currentConsumedCheckpointId;
    private final ThreadSafeUnaligner threadSafeUnaligner;

    /* JADX INFO: Access modifiers changed from: package-private */
    @ThreadSafe
    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner$ThreadSafeUnaligner.class */
    public static class ThreadSafeUnaligner implements BufferReceivedListener, Closeable {
        private final Map<InputChannelInfo, Boolean> storeNewBuffers;
        private int numBarriersReceived;
        private CompletableFuture<Void> allBarriersReceivedFuture = FutureUtils.completedVoidFuture();
        private long currentReceivedCheckpointId = -1;
        private int numOpenChannels;
        private final SubtaskCheckpointCoordinator checkpointCoordinator;
        private final CheckpointBarrierUnaligner handler;

        ThreadSafeUnaligner(SubtaskCheckpointCoordinator subtaskCheckpointCoordinator, CheckpointBarrierUnaligner checkpointBarrierUnaligner, InputGate... inputGateArr) {
            this.storeNewBuffers = (Map) Arrays.stream(inputGateArr).flatMap(inputGate -> {
                return inputGate.getChannelInfos().stream();
            }).collect(Collectors.toMap(Function.identity(), inputChannelInfo -> {
                return false;
            }));
            this.numOpenChannels = this.storeNewBuffers.size();
            this.checkpointCoordinator = subtaskCheckpointCoordinator;
            this.handler = checkpointBarrierUnaligner;
        }

        public synchronized void notifyBarrierReceived(CheckpointBarrier checkpointBarrier, InputChannelInfo inputChannelInfo) throws IOException {
            long id = checkpointBarrier.getId();
            if (this.currentReceivedCheckpointId < id) {
                handleNewCheckpoint(checkpointBarrier);
                this.handler.executeInTaskThread(() -> {
                    this.handler.notifyCheckpoint(checkpointBarrier);
                }, "notifyCheckpoint", new Object[0]);
            }
            if (id == this.currentReceivedCheckpointId && this.storeNewBuffers.get(inputChannelInfo).booleanValue()) {
                if (CheckpointBarrierUnaligner.LOG.isDebugEnabled()) {
                    CheckpointBarrierUnaligner.LOG.debug("{}: Received barrier from channel {} @ {}.", new Object[]{this.handler.taskName, inputChannelInfo, Long.valueOf(id)});
                }
                this.storeNewBuffers.put(inputChannelInfo, false);
                int i = this.numBarriersReceived + 1;
                this.numBarriersReceived = i;
                if (i == this.numOpenChannels) {
                    this.allBarriersReceivedFuture.complete(null);
                }
            }
        }

        public synchronized void notifyBufferReceived(Buffer buffer, InputChannelInfo inputChannelInfo) {
            if (this.storeNewBuffers.get(inputChannelInfo).booleanValue()) {
                this.checkpointCoordinator.getChannelStateWriter().addInputData(this.currentReceivedCheckpointId, inputChannelInfo, -2, CloseableIterator.ofElement(buffer, (v0) -> {
                    v0.recycleBuffer();
                }));
            } else {
                buffer.recycleBuffer();
            }
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public synchronized void close() throws IOException {
            this.allBarriersReceivedFuture.cancel(false);
        }

        private synchronized void handleNewCheckpoint(CheckpointBarrier checkpointBarrier) throws IOException {
            long id = checkpointBarrier.getId();
            if (!this.allBarriersReceivedFuture.isDone()) {
                CheckpointException checkpointException = new CheckpointException("Barrier id: " + id, CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED);
                if (isCheckpointPending()) {
                    CheckpointBarrierUnaligner.LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. Skipping current checkpoint.", new Object[]{this.handler.taskName, Long.valueOf(id), Long.valueOf(this.currentReceivedCheckpointId)});
                    long j = this.currentReceivedCheckpointId;
                    this.handler.executeInTaskThread(() -> {
                        this.handler.notifyAbort(j, checkpointException);
                    }, "notifyAbort", new Object[0]);
                }
                this.allBarriersReceivedFuture.completeExceptionally(checkpointException);
            }
            this.handler.markCheckpointStart(checkpointBarrier.getTimestamp());
            this.currentReceivedCheckpointId = id;
            this.storeNewBuffers.entrySet().forEach(entry -> {
            });
            this.numBarriersReceived = 0;
            this.allBarriersReceivedFuture = new CompletableFuture<>();
            this.checkpointCoordinator.initCheckpoint(id, checkpointBarrier.getCheckpointOptions());
        }

        synchronized CompletableFuture<Void> getAllBarriersReceivedFuture(long j) {
            if (j < this.currentReceivedCheckpointId) {
                return FutureUtils.completedVoidFuture();
            }
            if (j > this.currentReceivedCheckpointId) {
                throw new IllegalStateException("Checkpoint " + j + " has not been started at all");
            }
            return this.allBarriersReceivedFuture;
        }

        synchronized void onChannelClosed() throws IOException {
            this.numOpenChannels--;
            if (resetPendingCheckpoint()) {
                this.handler.notifyAbort(this.currentReceivedCheckpointId, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
            }
        }

        synchronized boolean setCancelledCheckpointId(long j) {
            if (this.currentReceivedCheckpointId > j) {
                return false;
            }
            if (this.currentReceivedCheckpointId == j && this.numBarriersReceived == 0) {
                return false;
            }
            resetPendingCheckpoint();
            this.currentReceivedCheckpointId = j;
            return true;
        }

        synchronized void tryAbortPendingCheckpoint(long j, CheckpointException checkpointException) throws IOException {
            if (j <= this.currentReceivedCheckpointId || !resetPendingCheckpoint()) {
                return;
            }
            this.handler.notifyAbort(this.currentReceivedCheckpointId, checkpointException);
        }

        private boolean resetPendingCheckpoint() {
            if (this.numBarriersReceived == 0) {
                return false;
            }
            this.storeNewBuffers.entrySet().forEach(entry -> {
            });
            this.numBarriersReceived = 0;
            return true;
        }

        @VisibleForTesting
        synchronized int getNumOpenChannels() {
            return this.numOpenChannels;
        }

        @VisibleForTesting
        synchronized long getCurrentCheckpointId() {
            return this.currentReceivedCheckpointId;
        }

        @VisibleForTesting
        boolean isCheckpointPending() {
            return this.numBarriersReceived > 0;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CheckpointBarrierUnaligner(SubtaskCheckpointCoordinator subtaskCheckpointCoordinator, String str, AbstractInvokable abstractInvokable, InputGate... inputGateArr) {
        super(abstractInvokable);
        this.currentConsumedCheckpointId = -1L;
        this.taskName = str;
        this.hasInflightBuffers = (Map) Arrays.stream(inputGateArr).flatMap(inputGate -> {
            return inputGate.getChannelInfos().stream();
        }).collect(Collectors.toMap(Function.identity(), inputChannelInfo -> {
            return false;
        }));
        this.threadSafeUnaligner = new ThreadSafeUnaligner((SubtaskCheckpointCoordinator) Preconditions.checkNotNull(subtaskCheckpointCoordinator), this, inputGateArr);
    }

    @Override // org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler
    public void processBarrier(CheckpointBarrier checkpointBarrier, InputChannelInfo inputChannelInfo) throws Exception {
        long id = checkpointBarrier.getId();
        if (this.currentConsumedCheckpointId <= id) {
            if (this.currentConsumedCheckpointId != id || isCheckpointPending()) {
                if (this.currentConsumedCheckpointId < id) {
                    this.currentConsumedCheckpointId = id;
                    this.numBarrierConsumed = 0;
                    this.hasInflightBuffers.entrySet().forEach(entry -> {
                    });
                }
                if (this.currentConsumedCheckpointId == id) {
                    this.hasInflightBuffers.put(inputChannelInfo, false);
                    this.numBarrierConsumed++;
                }
                this.threadSafeUnaligner.notifyBarrierReceived(checkpointBarrier, inputChannelInfo);
            }
        }
    }

    @Override // org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler
    public void abortPendingCheckpoint(long j, CheckpointException checkpointException) throws IOException {
        this.threadSafeUnaligner.tryAbortPendingCheckpoint(j, checkpointException);
        if (j > this.currentConsumedCheckpointId) {
            resetPendingCheckpoint(j);
        }
    }

    @Override // org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler
    public void processCancellationBarrier(CancelCheckpointMarker cancelCheckpointMarker) throws Exception {
        long checkpointId = cancelCheckpointMarker.getCheckpointId();
        if (this.threadSafeUnaligner.setCancelledCheckpointId(checkpointId)) {
            notifyAbort(checkpointId, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER));
        }
        if (checkpointId >= this.currentConsumedCheckpointId) {
            resetPendingCheckpoint(checkpointId);
            this.currentConsumedCheckpointId = checkpointId;
        }
    }

    @Override // org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler
    public void processEndOfPartition() throws Exception {
        this.threadSafeUnaligner.onChannelClosed();
        resetPendingCheckpoint(-1L);
    }

    private void resetPendingCheckpoint(long j) {
        if (isCheckpointPending()) {
            LOG.warn("{}: Received barrier or EndOfPartition(-1) {} before completing current checkpoint {}. Skipping current checkpoint.", new Object[]{this.taskName, Long.valueOf(j), Long.valueOf(this.currentConsumedCheckpointId)});
            this.hasInflightBuffers.entrySet().forEach(entry -> {
            });
            this.numBarrierConsumed = 0;
        }
    }

    @Override // org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler
    public long getLatestCheckpointId() {
        return this.currentConsumedCheckpointId;
    }

    public String toString() {
        return String.format("%s: last checkpoint: %d", this.taskName, Long.valueOf(this.currentConsumedCheckpointId));
    }

    @Override // org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        super.close();
        this.threadSafeUnaligner.close();
    }

    @Override // org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler
    public boolean hasInflightData(long j, InputChannelInfo inputChannelInfo) {
        if (j < this.currentConsumedCheckpointId) {
            return false;
        }
        if (j > this.currentConsumedCheckpointId) {
            return true;
        }
        return this.hasInflightBuffers.get(inputChannelInfo).booleanValue();
    }

    @Override // org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler
    public CompletableFuture<Void> getAllBarriersReceivedFuture(long j) {
        return this.threadSafeUnaligner.getAllBarriersReceivedFuture(j);
    }

    @Override // org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler
    public Optional<BufferReceivedListener> getBufferReceivedListener() {
        return Optional.of(this.threadSafeUnaligner);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler
    public boolean isCheckpointPending() {
        return this.numBarrierConsumed > 0;
    }

    @VisibleForTesting
    int getNumOpenChannels() {
        return this.threadSafeUnaligner.getNumOpenChannels();
    }

    @VisibleForTesting
    ThreadSafeUnaligner getThreadSafeUnaligner() {
        return this.threadSafeUnaligner;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifyCheckpoint(CheckpointBarrier checkpointBarrier) throws IOException {
        if (checkpointBarrier.getId() >= this.threadSafeUnaligner.getCurrentCheckpointId()) {
            super.notifyCheckpoint(checkpointBarrier, 0L);
        }
    }
}
