package org.apache.flink.runtime.state;

import java.util.Collections;
import java.util.List;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/TaskStateManagerImpl.class */
public class TaskStateManagerImpl implements TaskStateManager {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) TaskStateManagerImpl.class);
    private final JobID jobId;
    private final ExecutionAttemptID executionAttemptID;

    @Nullable
    private final JobManagerTaskRestore jobManagerTaskRestore;
    private final TaskLocalStateStore localStateStore;
    private final CheckpointResponder checkpointResponder;

    public TaskStateManagerImpl(@Nonnull JobID jobID, @Nonnull ExecutionAttemptID executionAttemptID, @Nonnull TaskLocalStateStore taskLocalStateStore, @Nullable JobManagerTaskRestore jobManagerTaskRestore, @Nonnull CheckpointResponder checkpointResponder) {
        this.jobId = jobID;
        this.localStateStore = taskLocalStateStore;
        this.jobManagerTaskRestore = jobManagerTaskRestore;
        this.executionAttemptID = executionAttemptID;
        this.checkpointResponder = checkpointResponder;
    }

    @Override // org.apache.flink.runtime.state.TaskStateManager
    public void reportTaskStateSnapshots(@Nonnull CheckpointMetaData checkpointMetaData, @Nonnull CheckpointMetrics checkpointMetrics, @Nullable TaskStateSnapshot taskStateSnapshot, @Nullable TaskStateSnapshot taskStateSnapshot2) {
        long checkpointId = checkpointMetaData.getCheckpointId();
        this.localStateStore.storeLocalState(checkpointId, taskStateSnapshot2);
        this.checkpointResponder.acknowledgeCheckpoint(this.jobId, this.executionAttemptID, checkpointId, checkpointMetrics, taskStateSnapshot);
    }

    @Override // org.apache.flink.runtime.state.TaskStateManager
    @Nonnull
    public PrioritizedOperatorSubtaskState prioritizedOperatorState(OperatorID operatorID) {
        OperatorSubtaskState subtaskStateByOperatorID;
        OperatorSubtaskState subtaskStateByOperatorID2;
        if (this.jobManagerTaskRestore != null && (subtaskStateByOperatorID = this.jobManagerTaskRestore.getTaskStateSnapshot().getSubtaskStateByOperatorID(operatorID)) != null) {
            long restoreCheckpointId = this.jobManagerTaskRestore.getRestoreCheckpointId();
            TaskStateSnapshot retrieveLocalState = this.localStateStore.retrieveLocalState(restoreCheckpointId);
            this.localStateStore.pruneMatchingCheckpoints(j -> {
                return j != restoreCheckpointId;
            });
            List emptyList = Collections.emptyList();
            if (retrieveLocalState != null && (subtaskStateByOperatorID2 = retrieveLocalState.getSubtaskStateByOperatorID(operatorID)) != null) {
                emptyList = Collections.singletonList(subtaskStateByOperatorID2);
            }
            LOG.debug("Operator {} has remote state {} from job manager and local state alternatives {} from local state store {}.", operatorID, subtaskStateByOperatorID, emptyList, this.localStateStore);
            return new PrioritizedOperatorSubtaskState.Builder(subtaskStateByOperatorID, emptyList, true).build();
        }
        return PrioritizedOperatorSubtaskState.emptyNotRestored();
    }

    @Override // org.apache.flink.runtime.state.TaskStateManager
    @Nonnull
    public LocalRecoveryConfig createLocalRecoveryConfig() {
        return this.localStateStore.getLocalRecoveryConfig();
    }

    @Override // org.apache.flink.runtime.state.CheckpointListener
    public void notifyCheckpointComplete(long j) throws Exception {
        this.localStateStore.confirmCheckpoint(j);
    }
}
