package org.apache.flink.runtime.executiongraph;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.Archiveable;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.ProducerDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/Execution.class */
public class Execution implements AccessExecution, Archiveable<ArchivedExecution>, LogicalSlot.Payload {
    private static final Logger LOG = DefaultExecutionGraph.LOG;
    private static final int NUM_CANCEL_CALL_TRIES = 3;
    private final Executor executor;
    private final ExecutionVertex vertex;
    private final ExecutionAttemptID attemptId;
    private final Time rpcTimeout;
    private final Collection<PartitionInfo> partitionInfos;
    private final CompletableFuture<ExecutionState> terminalStateFuture;
    private final CompletableFuture<?> releaseFuture;
    private final CompletableFuture<TaskManagerLocation> taskManagerLocationFuture;
    private final CompletableFuture<?> initializingOrRunningFuture;
    private LogicalSlot assignedResource;

    @Nullable
    private JobManagerTaskRestore taskRestore;

    @Nullable
    private AllocationID assignedAllocationID;
    private Map<String, Accumulator<?, ?>> userAccumulators;
    private IOMetrics ioMetrics;
    private Map<IntermediateResultPartitionID, ResultPartitionDeploymentDescriptor> producedPartitions;
    private volatile ExecutionState state = ExecutionState.CREATED;
    private Optional<ErrorInfo> failureCause = Optional.empty();
    private final Object accumulatorLock = new Object();
    private final long[] stateTimestamps = new long[ExecutionState.values().length];
    private final long[] stateEndTimestamps = new long[ExecutionState.values().length];

    public Execution(Executor executor, ExecutionVertex executionVertex, int i, long j, Time time) {
        this.executor = (Executor) Preconditions.checkNotNull(executor);
        this.vertex = (ExecutionVertex) Preconditions.checkNotNull(executionVertex);
        this.attemptId = new ExecutionAttemptID(executionVertex.getExecutionGraphAccessor().getExecutionGraphID(), executionVertex.getID(), i);
        this.rpcTimeout = (Time) Preconditions.checkNotNull(time);
        markTimestamp(ExecutionState.CREATED, j);
        this.partitionInfos = new ArrayList(16);
        this.producedPartitions = Collections.emptyMap();
        this.terminalStateFuture = new CompletableFuture<>();
        this.releaseFuture = new CompletableFuture<>();
        this.taskManagerLocationFuture = new CompletableFuture<>();
        this.initializingOrRunningFuture = new CompletableFuture<>();
        this.assignedResource = null;
    }

    public ExecutionVertex getVertex() {
        return this.vertex;
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecution
    public ExecutionAttemptID getAttemptId() {
        return this.attemptId;
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecution
    public int getAttemptNumber() {
        return this.attemptId.getAttemptNumber();
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecution
    public ExecutionState getState() {
        return this.state;
    }

    @Nullable
    public AllocationID getAssignedAllocationID() {
        return this.assignedAllocationID;
    }

    public CompletableFuture<TaskManagerLocation> getTaskManagerLocationFuture() {
        return this.taskManagerLocationFuture;
    }

    public LogicalSlot getAssignedResource() {
        return this.assignedResource;
    }

    public Optional<ResultPartitionDeploymentDescriptor> getResultPartitionDeploymentDescriptor(IntermediateResultPartitionID intermediateResultPartitionID) {
        return Optional.ofNullable(this.producedPartitions.get(intermediateResultPartitionID));
    }

    public boolean tryAssignResource(LogicalSlot logicalSlot) {
        assertRunningInJobMasterMainThread();
        Preconditions.checkNotNull(logicalSlot);
        if ((this.state != ExecutionState.SCHEDULED && this.state != ExecutionState.CREATED) || this.assignedResource != null) {
            return false;
        }
        this.assignedResource = logicalSlot;
        if (!logicalSlot.tryAssignPayload(this)) {
            this.assignedResource = null;
            return false;
        }
        if ((this.state != ExecutionState.SCHEDULED && this.state != ExecutionState.CREATED) || this.taskManagerLocationFuture.isDone()) {
            this.assignedResource = null;
            return false;
        }
        this.taskManagerLocationFuture.complete(logicalSlot.getTaskManagerLocation());
        this.assignedAllocationID = logicalSlot.getAllocationId();
        getVertex().setLatestPriorSlotAllocation(this.assignedResource.getTaskManagerLocation(), logicalSlot.getAllocationId());
        return true;
    }

    public Optional<InputSplit> getNextInputSplit() {
        LogicalSlot assignedResource = getAssignedResource();
        return this.vertex.getNextInputSplit(assignedResource != null ? assignedResource.getTaskManagerLocation().getHostname() : null, getAttemptNumber());
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecution
    public TaskManagerLocation getAssignedResourceLocation() {
        LogicalSlot logicalSlot = this.assignedResource;
        if (logicalSlot != null) {
            return logicalSlot.getTaskManagerLocation();
        }
        return null;
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecution
    public Optional<ErrorInfo> getFailureInfo() {
        return this.failureCause;
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecution
    public long[] getStateTimestamps() {
        return this.stateTimestamps;
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecution
    public long[] getStateEndTimestamps() {
        return this.stateEndTimestamps;
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecution
    public long getStateTimestamp(ExecutionState executionState) {
        return this.stateTimestamps[executionState.ordinal()];
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecution
    public long getStateEndTimestamp(ExecutionState executionState) {
        return this.stateEndTimestamps[executionState.ordinal()];
    }

    public boolean isFinished() {
        return this.state.isTerminal();
    }

    @Nullable
    public JobManagerTaskRestore getTaskRestore() {
        return this.taskRestore;
    }

    public void setInitialState(JobManagerTaskRestore jobManagerTaskRestore) {
        this.taskRestore = jobManagerTaskRestore;
    }

    public CompletableFuture<?> getInitializingOrRunningFuture() {
        return this.initializingOrRunningFuture;
    }

    @Override // org.apache.flink.runtime.jobmaster.LogicalSlot.Payload
    public CompletableFuture<ExecutionState> getTerminalStateFuture() {
        return this.terminalStateFuture;
    }

    public CompletableFuture<?> getReleaseFuture() {
        return this.releaseFuture;
    }

    public CompletableFuture<Void> registerProducedPartitions(TaskManagerLocation taskManagerLocation) {
        assertRunningInJobMasterMainThread();
        return FutureUtils.thenApplyAsyncIfNotDone(registerProducedPartitions(this.vertex, taskManagerLocation, this.attemptId), this.vertex.getExecutionGraphAccessor().getJobMasterMainThreadExecutor(), map -> {
            this.producedPartitions = map;
            if (getState() == ExecutionState.SCHEDULED) {
                startTrackingPartitions(taskManagerLocation.getResourceID(), map.values());
                return null;
            }
            LOG.info("Discarding late registered partitions for {} task {}.", getState(), this.attemptId);
            Iterator it = map.values().iterator();
            while (it.hasNext()) {
                getVertex().getExecutionGraphAccessor().getShuffleMaster().releasePartitionExternally(((ResultPartitionDeploymentDescriptor) it.next()).getShuffleDescriptor());
            }
            return null;
        });
    }

    private static CompletableFuture<Map<IntermediateResultPartitionID, ResultPartitionDeploymentDescriptor>> registerProducedPartitions(ExecutionVertex executionVertex, TaskManagerLocation taskManagerLocation, ExecutionAttemptID executionAttemptID) {
        ProducerDescriptor create = ProducerDescriptor.create(taskManagerLocation, executionAttemptID);
        Collection<IntermediateResultPartition> values = executionVertex.getProducedPartitions().values();
        ArrayList arrayList = new ArrayList(values.size());
        for (IntermediateResultPartition intermediateResultPartition : values) {
            PartitionDescriptor from = PartitionDescriptor.from(intermediateResultPartition);
            int partitionMaxParallelism = getPartitionMaxParallelism(intermediateResultPartition);
            arrayList.add(executionVertex.getExecutionGraphAccessor().getShuffleMaster().registerPartitionWithProducer(executionVertex.getJobId(), from, create).thenApply(shuffleDescriptor -> {
                return new ResultPartitionDeploymentDescriptor(from, shuffleDescriptor, partitionMaxParallelism);
            }));
        }
        return FutureUtils.combineAll(arrayList).thenApply(collection -> {
            LinkedHashMap linkedHashMap = new LinkedHashMap(values.size());
            collection.forEach(resultPartitionDeploymentDescriptor -> {
            });
            return linkedHashMap;
        });
    }

    private static int getPartitionMaxParallelism(IntermediateResultPartition intermediateResultPartition) {
        return intermediateResultPartition.getIntermediateResult().getConsumersMaxParallelism();
    }

    public void deploy() throws JobException {
        assertRunningInJobMasterMainThread();
        LogicalSlot logicalSlot = this.assignedResource;
        Preconditions.checkNotNull(logicalSlot, "In order to deploy the execution we first have to assign a resource via tryAssignResource.");
        if (!logicalSlot.isAlive()) {
            throw new JobException("Target slot (TaskManager) for deployment is no longer alive.");
        }
        ExecutionState executionState = this.state;
        if (executionState != ExecutionState.SCHEDULED) {
            throw new IllegalStateException("The vertex must be in SCHEDULED state to be deployed. Found state " + executionState);
        }
        if (!transitionState(executionState, ExecutionState.DEPLOYING)) {
            throw new IllegalStateException("Cannot deploy task: Concurrent deployment call race.");
        }
        if (this != logicalSlot.getPayload()) {
            throw new IllegalStateException(String.format("The execution %s has not been assigned to the assigned slot.", this));
        }
        try {
            if (this.state != ExecutionState.DEPLOYING) {
                logicalSlot.releaseSlot(new FlinkException("Actual state of execution " + this + " (" + this.state + ") does not match expected state DEPLOYING."));
                return;
            }
            LOG.info("Deploying {} (attempt #{}) with attempt id {} and vertex id {} to {} with allocation id {}", new Object[]{this.vertex.getTaskNameWithSubtaskIndex(), Integer.valueOf(getAttemptNumber()), this.attemptId, this.vertex.getID(), getAssignedResourceLocation(), logicalSlot.getAllocationId()});
            TaskDeploymentDescriptor createDeploymentDescriptor = TaskDeploymentDescriptorFactory.fromExecution(this).createDeploymentDescriptor(logicalSlot.getAllocationId(), this.taskRestore, this.producedPartitions.values());
            this.taskRestore = null;
            TaskManagerGateway taskManagerGateway = logicalSlot.getTaskManagerGateway();
            Executor jobMasterMainThreadExecutor = this.vertex.getExecutionGraphAccessor().getJobMasterMainThreadExecutor();
            getVertex().notifyPendingDeployment(this);
            CompletableFuture.supplyAsync(() -> {
                return taskManagerGateway.submitTask(createDeploymentDescriptor, this.rpcTimeout);
            }, this.executor).thenCompose(Function.identity()).whenCompleteAsync((acknowledge, th) -> {
                if (th == null) {
                    this.vertex.notifyCompletedDeployment(this);
                    return;
                }
                Throwable stripCompletionException = ExceptionUtils.stripCompletionException(th);
                if (!(stripCompletionException instanceof TimeoutException)) {
                    markFailed(stripCompletionException);
                } else {
                    markFailed(new Exception("Cannot deploy task " + (this.vertex.getTaskNameWithSubtaskIndex() + " (" + this.attemptId + ')') + " - TaskManager (" + getAssignedResourceLocation() + ") not responding after a rpcTimeout of " + this.rpcTimeout, stripCompletionException));
                }
            }, jobMasterMainThreadExecutor);
        } catch (Throwable th2) {
            markFailed(th2);
        }
    }

    public void cancel() {
        assertRunningInJobMasterMainThread();
        while (true) {
            ExecutionState executionState = this.state;
            if (executionState == ExecutionState.CANCELING || executionState == ExecutionState.CANCELED) {
                return;
            }
            if (executionState == ExecutionState.INITIALIZING || executionState == ExecutionState.RUNNING || executionState == ExecutionState.DEPLOYING) {
                if (startCancelling(3)) {
                    return;
                }
            } else if (executionState == ExecutionState.FINISHED) {
                sendReleaseIntermediateResultPartitionsRpcCall();
                return;
            } else {
                if (executionState == ExecutionState.FAILED) {
                    return;
                }
                if (executionState != ExecutionState.CREATED && executionState != ExecutionState.SCHEDULED) {
                    throw new IllegalStateException(executionState.name());
                }
                if (cancelAtomically()) {
                    return;
                }
            }
        }
    }

    public CompletableFuture<?> suspend() {
        switch (this.state) {
            case RUNNING:
            case INITIALIZING:
            case DEPLOYING:
            case CREATED:
            case SCHEDULED:
                if (!cancelAtomically()) {
                    throw new IllegalStateException(String.format("Could not directly go to %s from %s.", ExecutionState.CANCELED.name(), this.state.name()));
                }
                break;
            case CANCELING:
                completeCancelling();
                break;
            case FINISHED:
                sendReleaseIntermediateResultPartitionsRpcCall();
                break;
            case FAILED:
            case CANCELED:
                break;
            default:
                throw new IllegalStateException(this.state.name());
        }
        return this.releaseFuture;
    }

    private void updatePartitionConsumers(IntermediateResultPartition intermediateResultPartition) {
        ExecutionVertex executionVertexOrThrow;
        Execution currentExecutionAttempt;
        ExecutionState state;
        List<ConsumerVertexGroup> consumerVertexGroups = intermediateResultPartition.getConsumerVertexGroups();
        if (consumerVertexGroups.isEmpty()) {
            return;
        }
        HashSet hashSet = new HashSet();
        Iterator<ConsumerVertexGroup> it = consumerVertexGroups.iterator();
        while (it.hasNext()) {
            Iterator<ExecutionVertexID> it2 = it.next().iterator();
            while (it2.hasNext()) {
                ExecutionVertexID next = it2.next();
                if (!hashSet.contains(next) && ((state = (currentExecutionAttempt = (executionVertexOrThrow = this.vertex.getExecutionGraphAccessor().getExecutionVertexOrThrow(next)).getCurrentExecutionAttempt()).getState()) == ExecutionState.DEPLOYING || state == ExecutionState.RUNNING || state == ExecutionState.INITIALIZING)) {
                    PartitionInfo createPartitionInfo = createPartitionInfo(intermediateResultPartition);
                    hashSet.add(next);
                    if (state == ExecutionState.DEPLOYING) {
                        executionVertexOrThrow.cachePartitionInfo(createPartitionInfo);
                    } else {
                        currentExecutionAttempt.sendUpdatePartitionInfoRpcCall(Collections.singleton(createPartitionInfo));
                    }
                }
            }
        }
    }

    private static PartitionInfo createPartitionInfo(IntermediateResultPartition intermediateResultPartition) {
        return new PartitionInfo(intermediateResultPartition.getIntermediateResult().getId(), TaskDeploymentDescriptorFactory.getConsumedPartitionShuffleDescriptor(intermediateResultPartition, TaskDeploymentDescriptorFactory.PartitionLocationConstraint.MUST_BE_KNOWN));
    }

    @Override // org.apache.flink.runtime.jobmaster.LogicalSlot.Payload
    public void fail(Throwable th) {
        processFail(th, true);
    }

    public void notifyCheckpointOnComplete(long j, long j2, long j3) {
        LogicalSlot logicalSlot = this.assignedResource;
        if (logicalSlot != null) {
            logicalSlot.getTaskManagerGateway().notifyCheckpointOnComplete(this.attemptId, getVertex().getJobId(), j, j2, j3);
        } else {
            LOG.debug("The execution has no slot assigned. This indicates that the execution is no longer running.");
        }
    }

    public void notifyCheckpointAborted(long j, long j2, long j3) {
        LogicalSlot logicalSlot = this.assignedResource;
        if (logicalSlot != null) {
            logicalSlot.getTaskManagerGateway().notifyCheckpointAborted(this.attemptId, getVertex().getJobId(), j, j2, j3);
        } else {
            LOG.debug("The execution has no slot assigned. This indicates that the execution is no longer running.");
        }
    }

    public CompletableFuture<Acknowledge> triggerCheckpoint(long j, long j2, CheckpointOptions checkpointOptions) {
        return triggerCheckpointHelper(j, j2, checkpointOptions);
    }

    public CompletableFuture<Acknowledge> triggerSynchronousSavepoint(long j, long j2, CheckpointOptions checkpointOptions) {
        return triggerCheckpointHelper(j, j2, checkpointOptions);
    }

    private CompletableFuture<Acknowledge> triggerCheckpointHelper(long j, long j2, CheckpointOptions checkpointOptions) {
        LogicalSlot logicalSlot = this.assignedResource;
        if (logicalSlot != null) {
            return logicalSlot.getTaskManagerGateway().triggerCheckpoint(this.attemptId, getVertex().getJobId(), j, j2, checkpointOptions);
        }
        LOG.debug("The execution has no slot assigned. This indicates that the execution is no longer running.");
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    public CompletableFuture<Acknowledge> sendOperatorEvent(OperatorID operatorID, SerializedValue<OperatorEvent> serializedValue) {
        assertRunningInJobMasterMainThread();
        LogicalSlot logicalSlot = this.assignedResource;
        return (logicalSlot == null || !(getState() == ExecutionState.RUNNING || getState() == ExecutionState.INITIALIZING)) ? FutureUtils.completedExceptionally(new TaskNotRunningException('\"' + this.vertex.getTaskNameWithSubtaskIndex() + "\" is not running, but in state " + getState())) : logicalSlot.getTaskManagerGateway().sendOperatorEventToTask(getAttemptId(), operatorID, serializedValue);
    }

    public void markFailed(Throwable th) {
        processFail(th, false);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void markFailed(Throwable th, boolean z, Map<String, Accumulator<?, ?>> map, IOMetrics iOMetrics, boolean z2, boolean z3) {
        processFail(th, z, map, iOMetrics, z2, z3);
    }

    @VisibleForTesting
    public void markFinished() {
        markFinished(null, null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Code restructure failed: missing block: B:32:0x0050, code lost:
    
        r10 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:34:0x0053, code lost:
    
        r6.vertex.executionFinished(r6);
     */
    /* JADX WARN: Code restructure failed: missing block: B:35:0x005c, code lost:
    
        throw r10;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void markFinished(java.util.Map<java.lang.String, org.apache.flink.api.common.accumulators.Accumulator<?, ?>> r7, org.apache.flink.runtime.executiongraph.IOMetrics r8) {
        /*
            r6 = this;
            r0 = r6
            r0.assertRunningInJobMasterMainThread()
        L4:
            r0 = r6
            org.apache.flink.runtime.execution.ExecutionState r0 = r0.state
            r9 = r0
            r0 = r9
            org.apache.flink.runtime.execution.ExecutionState r1 = org.apache.flink.runtime.execution.ExecutionState.INITIALIZING
            if (r0 == r1) goto L1e
            r0 = r9
            org.apache.flink.runtime.execution.ExecutionState r1 = org.apache.flink.runtime.execution.ExecutionState.RUNNING
            if (r0 == r1) goto L1e
            r0 = r9
            org.apache.flink.runtime.execution.ExecutionState r1 = org.apache.flink.runtime.execution.ExecutionState.DEPLOYING
            if (r0 != r1) goto L5e
        L1e:
            r0 = r6
            r1 = r9
            org.apache.flink.runtime.execution.ExecutionState r2 = org.apache.flink.runtime.execution.ExecutionState.FINISHED
            boolean r0 = r0.transitionState(r1, r2)
            if (r0 == 0) goto Lc9
            r0 = r6
            r0.finishPartitionsAndUpdateConsumers()     // Catch: java.lang.Throwable -> L50
            r0 = r6
            r1 = r7
            r2 = r8
            r0.updateAccumulatorsAndMetrics(r1, r2)     // Catch: java.lang.Throwable -> L50
            r0 = r6
            r1 = 0
            r0.releaseAssignedResource(r1)     // Catch: java.lang.Throwable -> L50
            r0 = r6
            org.apache.flink.runtime.executiongraph.ExecutionVertex r0 = r0.vertex     // Catch: java.lang.Throwable -> L50
            org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor r0 = r0.getExecutionGraphAccessor()     // Catch: java.lang.Throwable -> L50
            r1 = r6
            r0.deregisterExecution(r1)     // Catch: java.lang.Throwable -> L50
            r0 = r6
            org.apache.flink.runtime.executiongraph.ExecutionVertex r0 = r0.vertex
            r1 = r6
            r0.executionFinished(r1)
            goto L5d
        L50:
            r10 = move-exception
            r0 = r6
            org.apache.flink.runtime.executiongraph.ExecutionVertex r0 = r0.vertex
            r1 = r6
            r0.executionFinished(r1)
            r0 = r10
            throw r0
        L5d:
            return
        L5e:
            r0 = r9
            org.apache.flink.runtime.execution.ExecutionState r1 = org.apache.flink.runtime.execution.ExecutionState.CANCELING
            if (r0 != r1) goto L6d
            r0 = r6
            r1 = r7
            r2 = r8
            r3 = 1
            r0.completeCancelling(r1, r2, r3)
            return
        L6d:
            r0 = r9
            org.apache.flink.runtime.execution.ExecutionState r1 = org.apache.flink.runtime.execution.ExecutionState.CANCELED
            if (r0 == r1) goto L7b
            r0 = r9
            org.apache.flink.runtime.execution.ExecutionState r1 = org.apache.flink.runtime.execution.ExecutionState.FAILED
            if (r0 != r1) goto La6
        L7b:
            org.slf4j.Logger r0 = org.apache.flink.runtime.executiongraph.Execution.LOG
            boolean r0 = r0.isDebugEnabled()
            if (r0 == 0) goto La5
            org.slf4j.Logger r0 = org.apache.flink.runtime.executiongraph.Execution.LOG
            java.lang.StringBuilder r1 = new java.lang.StringBuilder
            r2 = r1
            r2.<init>()
            java.lang.String r2 = "Task FINISHED, but concurrently went to state "
            java.lang.StringBuilder r1 = r1.append(r2)
            r2 = r6
            org.apache.flink.runtime.execution.ExecutionState r2 = r2.state
            java.lang.StringBuilder r1 = r1.append(r2)
            java.lang.String r1 = r1.toString()
            r0.debug(r1)
        La5:
            return
        La6:
            r0 = r6
            java.lang.Exception r1 = new java.lang.Exception
            r2 = r1
            java.lang.StringBuilder r3 = new java.lang.StringBuilder
            r4 = r3
            r4.<init>()
            java.lang.String r4 = "Vertex received FINISHED message while being in state "
            java.lang.StringBuilder r3 = r3.append(r4)
            r4 = r6
            org.apache.flink.runtime.execution.ExecutionState r4 = r4.state
            java.lang.StringBuilder r3 = r3.append(r4)
            java.lang.String r3 = r3.toString()
            r2.<init>(r3)
            r0.markFailed(r1)
            return
        Lc9:
            goto L4
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.runtime.executiongraph.Execution.markFinished(java.util.Map, org.apache.flink.runtime.executiongraph.IOMetrics):void");
    }

    private void finishPartitionsAndUpdateConsumers() {
        Iterator<IntermediateResultPartition> it = getVertex().finishAllBlockingPartitions().iterator();
        while (it.hasNext()) {
            updatePartitionConsumers(it.next());
        }
    }

    private boolean cancelAtomically() {
        if (!startCancelling(0)) {
            return false;
        }
        completeCancelling();
        return true;
    }

    private boolean startCancelling(int i) {
        if (!transitionState(this.state, ExecutionState.CANCELING)) {
            return false;
        }
        this.taskManagerLocationFuture.cancel(false);
        sendCancelRpcCall(i);
        return true;
    }

    void completeCancelling() {
        completeCancelling(null, null, true);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void completeCancelling(Map<String, Accumulator<?, ?>> map, IOMetrics iOMetrics, boolean z) {
        ExecutionState executionState;
        do {
            executionState = this.state;
            if (executionState == ExecutionState.CANCELED) {
                return;
            }
            if (executionState != ExecutionState.CANCELING && executionState != ExecutionState.RUNNING && executionState != ExecutionState.INITIALIZING && executionState != ExecutionState.DEPLOYING) {
                if (executionState != ExecutionState.FAILED) {
                    String format = String.format("Asynchronous race: Found %s in state %s after successful cancel call.", this.vertex.getTaskNameWithSubtaskIndex(), this.state);
                    LOG.error(format);
                    this.vertex.getExecutionGraphAccessor().failGlobal(new Exception(format));
                    return;
                }
                return;
            }
            updateAccumulatorsAndMetrics(map, iOMetrics);
        } while (!transitionState(executionState, ExecutionState.CANCELED));
        finishCancellation(z);
    }

    private void finishCancellation(boolean z) {
        releaseAssignedResource(new FlinkException("Execution " + this + " was cancelled."));
        this.vertex.getExecutionGraphAccessor().deregisterExecution(this);
        handlePartitionCleanup(z, z);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void cachePartitionInfo(PartitionInfo partitionInfo) {
        this.partitionInfos.add(partitionInfo);
    }

    private void sendPartitionInfos() {
        if (this.partitionInfos.isEmpty()) {
            return;
        }
        sendUpdatePartitionInfoRpcCall(new ArrayList(this.partitionInfos));
        this.partitionInfos.clear();
    }

    private void processFail(Throwable th, boolean z) {
        processFail(th, z, null, null, true, false);
    }

    private void processFail(Throwable th, boolean z, Map<String, Accumulator<?, ?>> map, IOMetrics iOMetrics, boolean z2, boolean z3) {
        assertRunningInJobMasterMainThread();
        ExecutionState executionState = this.state;
        if (executionState == ExecutionState.FAILED) {
            return;
        }
        if (executionState == ExecutionState.CANCELED || executionState == ExecutionState.FINISHED) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Ignoring transition of vertex {} to {} while being {}.", new Object[]{getVertexWithAttempt(), ExecutionState.FAILED, executionState});
            }
        } else {
            if (executionState == ExecutionState.CANCELING) {
                completeCancelling(map, iOMetrics, true);
                return;
            }
            if (!z3) {
                this.vertex.getExecutionGraphAccessor().notifySchedulerNgAboutInternalTaskFailure(this.attemptId, th, z, z2);
                return;
            }
            Preconditions.checkState(transitionState(executionState, ExecutionState.FAILED, th));
            this.failureCause = Optional.of(ErrorInfo.createErrorInfoWithNullableCause(th, getStateTimestamp(ExecutionState.FAILED)));
            updateAccumulatorsAndMetrics(map, iOMetrics);
            releaseAssignedResource(th);
            this.vertex.getExecutionGraphAccessor().deregisterExecution(this);
            maybeReleasePartitionsAndSendCancelRpcCall(executionState, z, z2);
        }
    }

    private void maybeReleasePartitionsAndSendCancelRpcCall(ExecutionState executionState, boolean z, boolean z2) {
        handlePartitionCleanup(z2, z2);
        if (z) {
            if (executionState == ExecutionState.RUNNING || executionState == ExecutionState.INITIALIZING || executionState == ExecutionState.DEPLOYING) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Sending out cancel request, to remove task execution from TaskManager.");
                }
                try {
                    if (this.assignedResource != null) {
                        sendCancelRpcCall(3);
                    }
                } catch (Throwable th) {
                    LOG.error("Error triggering cancel call while marking task {} as failed.", getVertex().getTaskNameWithSubtaskIndex(), th);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean switchToRecovering() {
        if (!switchTo(ExecutionState.DEPLOYING, ExecutionState.INITIALIZING)) {
            return false;
        }
        sendPartitionInfos();
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean switchToRunning() {
        return switchTo(ExecutionState.INITIALIZING, ExecutionState.RUNNING);
    }

    private boolean switchTo(ExecutionState executionState, ExecutionState executionState2) {
        if (transitionState(executionState, executionState2)) {
            return true;
        }
        ExecutionState executionState3 = this.state;
        if (executionState3 == ExecutionState.FINISHED || executionState3 == ExecutionState.CANCELED) {
            return false;
        }
        if (executionState3 == ExecutionState.CANCELING || executionState3 == ExecutionState.FAILED) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Concurrent canceling/failing of {} while deployment was in progress.", getVertexWithAttempt());
            }
            sendCancelRpcCall(3);
            return false;
        }
        String format = String.format("Concurrent unexpected state transition of task %s from %s (expected %s) to %s while deployment was in progress.", getAttemptId(), executionState3, executionState, executionState2);
        LOG.debug(format);
        sendCancelRpcCall(3);
        markFailed(new Exception(format));
        return false;
    }

    private void sendCancelRpcCall(int i) {
        LogicalSlot logicalSlot = this.assignedResource;
        if (logicalSlot != null) {
            TaskManagerGateway taskManagerGateway = logicalSlot.getTaskManagerGateway();
            FutureUtils.retry(() -> {
                return taskManagerGateway.cancelTask(this.attemptId, this.rpcTimeout);
            }, i, getVertex().getExecutionGraphAccessor().getJobMasterMainThreadExecutor()).whenComplete((acknowledge, th) -> {
                if (th != null) {
                    fail(new Exception("Task could not be canceled.", th));
                }
            });
        }
    }

    private void startTrackingPartitions(ResourceID resourceID, Collection<ResultPartitionDeploymentDescriptor> collection) {
        JobMasterPartitionTracker partitionTracker = this.vertex.getExecutionGraphAccessor().getPartitionTracker();
        Iterator<ResultPartitionDeploymentDescriptor> it = collection.iterator();
        while (it.hasNext()) {
            partitionTracker.startTrackingPartition(resourceID, it.next());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handlePartitionCleanup(boolean z, boolean z2) {
        if (z) {
            sendReleaseIntermediateResultPartitionsRpcCall();
        }
        Collection<ResultPartitionID> partitionIds = getPartitionIds();
        JobMasterPartitionTracker partitionTracker = getVertex().getExecutionGraphAccessor().getPartitionTracker();
        if (partitionIds.isEmpty()) {
            return;
        }
        if (!z2) {
            partitionTracker.stopTrackingPartitions(partitionIds);
        } else {
            LOG.info("Discarding the results produced by task execution {}.", this.attemptId);
            partitionTracker.stopTrackingAndReleasePartitions(partitionIds);
        }
    }

    private Collection<ResultPartitionID> getPartitionIds() {
        return (Collection) this.producedPartitions.values().stream().map((v0) -> {
            return v0.getShuffleDescriptor();
        }).map((v0) -> {
            return v0.getResultPartitionID();
        }).collect(Collectors.toList());
    }

    private void sendReleaseIntermediateResultPartitionsRpcCall() {
        LOG.info("Discarding the results produced by task execution {}.", this.attemptId);
        LogicalSlot logicalSlot = this.assignedResource;
        if (logicalSlot != null) {
            TaskManagerGateway taskManagerGateway = logicalSlot.getTaskManagerGateway();
            ShuffleMaster<? extends ShuffleDescriptor> shuffleMaster = getVertex().getExecutionGraphAccessor().getShuffleMaster();
            Stream<R> map = this.producedPartitions.values().stream().filter(resultPartitionDeploymentDescriptor -> {
                return resultPartitionDeploymentDescriptor.getPartitionType().isReleaseByUpstream();
            }).map((v0) -> {
                return v0.getShuffleDescriptor();
            });
            shuffleMaster.getClass();
            Set<ResultPartitionID> set = (Set) map.peek(shuffleMaster::releasePartitionExternally).map((v0) -> {
                return v0.getResultPartitionID();
            }).collect(Collectors.toSet());
            if (set.isEmpty()) {
                return;
            }
            taskManagerGateway.releasePartitions(getVertex().getJobId(), set);
        }
    }

    private void sendUpdatePartitionInfoRpcCall(Iterable<PartitionInfo> iterable) {
        LogicalSlot logicalSlot = this.assignedResource;
        if (logicalSlot != null) {
            TaskManagerGateway taskManagerGateway = logicalSlot.getTaskManagerGateway();
            TaskManagerLocation taskManagerLocation = logicalSlot.getTaskManagerLocation();
            taskManagerGateway.updatePartitions(this.attemptId, iterable, this.rpcTimeout).whenCompleteAsync((acknowledge, th) -> {
                if (th != null) {
                    fail(new IllegalStateException("Update to task [" + getVertexWithAttempt() + "] on TaskManager " + taskManagerLocation + " failed", th));
                }
            }, (Executor) getVertex().getExecutionGraphAccessor().getJobMasterMainThreadExecutor());
        }
    }

    private void releaseAssignedResource(@Nullable Throwable th) {
        assertRunningInJobMasterMainThread();
        LogicalSlot logicalSlot = this.assignedResource;
        if (logicalSlot == null) {
            this.releaseFuture.complete(null);
        } else {
            ComponentMainThreadExecutor jobMasterMainThreadExecutor = getVertex().getExecutionGraphAccessor().getJobMasterMainThreadExecutor();
            logicalSlot.releaseSlot(th).whenComplete((obj, th2) -> {
                jobMasterMainThreadExecutor.assertRunningInMainThread();
                if (th2 != null) {
                    this.releaseFuture.completeExceptionally(th2);
                } else {
                    this.releaseFuture.complete(null);
                }
            });
        }
    }

    public void transitionState(ExecutionState executionState) {
        transitionState(this.state, executionState);
    }

    private boolean transitionState(ExecutionState executionState, ExecutionState executionState2) {
        return transitionState(executionState, executionState2, null);
    }

    private boolean transitionState(ExecutionState executionState, ExecutionState executionState2, Throwable th) {
        if (executionState.isTerminal()) {
            throw new IllegalStateException("Cannot leave terminal state " + executionState + " to transition to " + executionState2 + '.');
        }
        if (this.state != executionState) {
            return false;
        }
        this.state = executionState2;
        markTimestamp(executionState, executionState2);
        if (th == null) {
            LOG.info("{} ({}) switched from {} to {}.", new Object[]{getVertex().getTaskNameWithSubtaskIndex(), getAttemptId(), executionState, executionState2});
        } else if (LOG.isInfoEnabled()) {
            LOG.info("{} ({}) switched from {} to {} on {}.", new Object[]{getVertex().getTaskNameWithSubtaskIndex(), getAttemptId(), executionState, executionState2, getLocationInformation(), ExceptionUtils.stripCompletionException(th)});
        }
        if (executionState2 == ExecutionState.INITIALIZING || executionState2 == ExecutionState.RUNNING) {
            this.initializingOrRunningFuture.complete(null);
        } else if (executionState2.isTerminal()) {
            this.terminalStateFuture.complete(executionState2);
        }
        try {
            this.vertex.notifyStateTransition(this, executionState, executionState2);
            return true;
        } catch (Throwable th2) {
            LOG.error("Error while notifying execution graph of execution state transition.", th2);
            return true;
        }
    }

    private String getLocationInformation() {
        return this.assignedResource != null ? this.assignedResource.getTaskManagerLocation().toString() : "[unassigned resource]";
    }

    private void markTimestamp(ExecutionState executionState, ExecutionState executionState2) {
        long currentTimeMillis = System.currentTimeMillis();
        markTimestamp(executionState2, currentTimeMillis);
        markEndTimestamp(executionState, currentTimeMillis);
    }

    private void markTimestamp(ExecutionState executionState, long j) {
        this.stateTimestamps[executionState.ordinal()] = j;
    }

    private void markEndTimestamp(ExecutionState executionState, long j) {
        this.stateEndTimestamps[executionState.ordinal()] = j;
    }

    public String getVertexWithAttempt() {
        return this.vertex.getTaskNameWithSubtaskIndex() + " - execution #" + getAttemptNumber();
    }

    public void setAccumulators(Map<String, Accumulator<?, ?>> map) {
        synchronized (this.accumulatorLock) {
            if (!this.state.isTerminal()) {
                this.userAccumulators = map;
            }
        }
    }

    public Map<String, Accumulator<?, ?>> getUserAccumulators() {
        return this.userAccumulators;
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecution
    public StringifiedAccumulatorResult[] getUserAccumulatorsStringified() {
        return StringifiedAccumulatorResult.stringifyAccumulatorResults(this.userAccumulators == null ? null : (Map) this.userAccumulators.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return OptionalFailure.of(entry.getValue());
        })));
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecution
    public int getParallelSubtaskIndex() {
        return getVertex().getParallelSubtaskIndex();
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecution
    public IOMetrics getIOMetrics() {
        return this.ioMetrics;
    }

    private void updateAccumulatorsAndMetrics(Map<String, Accumulator<?, ?>> map, IOMetrics iOMetrics) {
        if (map != null) {
            synchronized (this.accumulatorLock) {
                this.userAccumulators = map;
            }
        }
        if (iOMetrics != null) {
            this.ioMetrics = iOMetrics;
        }
    }

    public String toString() {
        LogicalSlot logicalSlot = this.assignedResource;
        Object[] objArr = new Object[4];
        objArr[0] = Integer.valueOf(getAttemptNumber());
        objArr[1] = this.vertex.getTaskNameWithSubtaskIndex();
        objArr[2] = logicalSlot == null ? "(unassigned)" : logicalSlot;
        objArr[3] = this.state;
        return String.format("Attempt #%d (%s) @ %s - [%s]", objArr);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.flink.api.common.Archiveable
    public ArchivedExecution archive() {
        return new ArchivedExecution(this);
    }

    private void assertRunningInJobMasterMainThread() {
        this.vertex.getExecutionGraphAccessor().getJobMasterMainThreadExecutor().assertRunningInMainThread();
    }
}
