package org.apache.flink.runtime.checkpoint;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/StateAssignmentOperation.class */
public class StateAssignmentOperation {
    private final Logger logger;
    private final Map<JobVertexID, ExecutionJobVertex> tasks;
    private final Map<JobVertexID, TaskState> taskStates;
    private final boolean allowNonRestoredState;

    public StateAssignmentOperation(Logger logger, Map<JobVertexID, ExecutionJobVertex> map, Map<JobVertexID, TaskState> map2, boolean z) {
        this.logger = (Logger) Preconditions.checkNotNull(logger);
        this.tasks = (Map) Preconditions.checkNotNull(map);
        this.taskStates = (Map) Preconditions.checkNotNull(map2);
        this.allowNonRestoredState = z;
    }

    public boolean assignStates() throws Exception {
        boolean z = false;
        Map<JobVertexID, ExecutionJobVertex> map = this.tasks;
        for (Map.Entry<JobVertexID, TaskState> entry : this.taskStates.entrySet()) {
            TaskState value = entry.getValue();
            ExecutionJobVertex executionJobVertex = map.get(entry.getKey());
            if (executionJobVertex == null && !z) {
                map = ExecutionJobVertex.includeLegacyJobVertexIDs(map);
                executionJobVertex = map.get(entry.getKey());
                z = true;
                this.logger.info("Could not find ExecutionJobVertex. Including legacy JobVertexIDs in search.");
            }
            if (executionJobVertex != null) {
                checkParallelismPreconditions(value, executionJobVertex);
                assignTaskStatesToOperatorInstances(value, executionJobVertex);
            } else {
                if (!this.allowNonRestoredState) {
                    throw new IllegalStateException("There is no execution job vertex for the job vertex ID " + entry.getKey());
                }
                this.logger.info("Skipped checkpoint state for operator {}.", value.getJobVertexID());
            }
        }
        return true;
    }

    private void checkParallelismPreconditions(TaskState taskState, ExecutionJobVertex executionJobVertex) {
        if (taskState.getMaxParallelism() != executionJobVertex.getMaxParallelism()) {
            if (executionJobVertex.isMaxParallelismConfigured()) {
                throw new IllegalStateException("The maximum parallelism (" + taskState.getMaxParallelism() + ") with which the latest checkpoint of the execution job vertex " + executionJobVertex + " has been taken and the current maximum parallelism (" + executionJobVertex.getMaxParallelism() + ") changed. This is currently not supported.");
            }
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Overriding maximum parallelism for JobVertex " + executionJobVertex.getJobVertexId() + " from " + executionJobVertex.getMaxParallelism() + " to " + taskState.getMaxParallelism());
            }
            executionJobVertex.setMaxParallelism(taskState.getMaxParallelism());
        }
        int parallelism = taskState.getParallelism();
        int parallelism2 = executionJobVertex.getParallelism();
        if (taskState.hasNonPartitionedState() && parallelism != parallelism2) {
            throw new IllegalStateException("Cannot restore the latest checkpoint because the operator " + executionJobVertex.getJobVertexId() + " has non-partitioned state and its parallelism changed. The operator " + executionJobVertex.getJobVertexId() + " has parallelism " + parallelism2 + " whereas the corresponding state object has a parallelism of " + parallelism);
        }
    }

    private static void assignTaskStatesToOperatorInstances(TaskState taskState, ExecutionJobVertex executionJobVertex) {
        List<KeyGroupsStateHandle> keyGroupsStateHandles;
        List<KeyGroupsStateHandle> keyGroupsStateHandles2;
        int parallelism = taskState.getParallelism();
        int parallelism2 = executionJobVertex.getParallelism();
        List<KeyGroupRange> createKeyGroupPartitions = createKeyGroupPartitions(executionJobVertex.getMaxParallelism(), parallelism2);
        int chainLength = taskState.getChainLength();
        List[] listArr = new List[chainLength];
        List[] listArr2 = new List[chainLength];
        ArrayList arrayList = new ArrayList(parallelism);
        ArrayList arrayList2 = new ArrayList(parallelism);
        for (int i = 0; i < parallelism; i++) {
            SubtaskState state = taskState.getState(i);
            if (null != state) {
                collectParallelStatesByChainOperator(listArr, state.getManagedOperatorState());
                collectParallelStatesByChainOperator(listArr2, state.getRawOperatorState());
                KeyGroupsStateHandle managedKeyedState = state.getManagedKeyedState();
                if (null != managedKeyedState) {
                    arrayList.add(managedKeyedState);
                }
                KeyGroupsStateHandle rawKeyedState = state.getRawKeyedState();
                if (null != rawKeyedState) {
                    arrayList2.add(rawKeyedState);
                }
            }
        }
        List[] listArr3 = new List[chainLength];
        List[] listArr4 = new List[chainLength];
        OperatorStateRepartitioner operatorStateRepartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE;
        for (int i2 = 0; i2 < chainLength; i2++) {
            List list = listArr[i2];
            List list2 = listArr2[i2];
            listArr3[i2] = applyRepartitioner(operatorStateRepartitioner, list, parallelism, parallelism2);
            listArr4[i2] = applyRepartitioner(operatorStateRepartitioner, list2, parallelism, parallelism2);
        }
        for (int i3 = 0; i3 < parallelism2; i3++) {
            ChainedStateHandle<StreamStateHandle> chainedStateHandle = null;
            if (parallelism == parallelism2 && taskState.getState(i3) != null) {
                chainedStateHandle = taskState.getState(i3).getLegacyOperatorState();
            }
            Collection[] collectionArr = new Collection[chainLength];
            Collection[] collectionArr2 = new Collection[chainLength];
            List asList = Arrays.asList(collectionArr);
            List asList2 = Arrays.asList(collectionArr2);
            for (int i4 = 0; i4 < listArr3.length; i4++) {
                List list3 = listArr3[i4];
                List list4 = listArr4[i4];
                if (list3 != null) {
                    asList.set(i4, list3.get(i3));
                }
                if (list4 != null) {
                    asList2.set(i4, list4.get(i3));
                }
            }
            Execution currentExecutionAttempt = executionJobVertex.getTaskVertices()[i3].getCurrentExecutionAttempt();
            if (parallelism == parallelism2) {
                SubtaskState state2 = taskState.getState(i3);
                if (state2 != null) {
                    KeyGroupsStateHandle managedKeyedState2 = state2.getManagedKeyedState();
                    KeyGroupsStateHandle rawKeyedState2 = state2.getRawKeyedState();
                    keyGroupsStateHandles = managedKeyedState2 != null ? Collections.singletonList(managedKeyedState2) : null;
                    keyGroupsStateHandles2 = rawKeyedState2 != null ? Collections.singletonList(rawKeyedState2) : null;
                } else {
                    keyGroupsStateHandles = null;
                    keyGroupsStateHandles2 = null;
                }
            } else {
                KeyGroupRange keyGroupRange = createKeyGroupPartitions.get(i3);
                keyGroupsStateHandles = getKeyGroupsStateHandles(arrayList, keyGroupRange);
                keyGroupsStateHandles2 = getKeyGroupsStateHandles(arrayList2, keyGroupRange);
            }
            currentExecutionAttempt.setInitialState(new TaskStateHandles(chainedStateHandle, asList, asList2, keyGroupsStateHandles, keyGroupsStateHandles2));
        }
    }

    public static List<KeyGroupsStateHandle> getKeyGroupsStateHandles(Collection<KeyGroupsStateHandle> collection, KeyGroupRange keyGroupRange) {
        ArrayList arrayList = new ArrayList();
        Iterator<KeyGroupsStateHandle> it = collection.iterator();
        while (it.hasNext()) {
            KeyGroupsStateHandle keyGroupIntersection = it.next().getKeyGroupIntersection(keyGroupRange);
            if (keyGroupIntersection.getNumberOfKeyGroups() > 0) {
                arrayList.add(keyGroupIntersection);
            }
        }
        return arrayList;
    }

    public static List<KeyGroupRange> createKeyGroupPartitions(int i, int i2) {
        Preconditions.checkArgument(i >= i2);
        ArrayList arrayList = new ArrayList(i2);
        for (int i3 = 0; i3 < i2; i3++) {
            arrayList.add(KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(i, i2, i3));
        }
        return arrayList;
    }

    private static void collectParallelStatesByChainOperator(List<OperatorStateHandle>[] listArr, ChainedStateHandle<OperatorStateHandle> chainedStateHandle) {
        if (null != chainedStateHandle) {
            int length = chainedStateHandle.getLength();
            Preconditions.checkState(length >= listArr.length, "Found more states than operators in the chain. Chain length: " + length + ", States: " + listArr.length);
            for (int i = 0; i < listArr.length; i++) {
                OperatorStateHandle operatorStateHandle = chainedStateHandle.get(i);
                if (null != operatorStateHandle) {
                    List<OperatorStateHandle> list = listArr[i];
                    if (null == list) {
                        list = new ArrayList();
                        listArr[i] = list;
                    }
                    list.add(operatorStateHandle);
                }
            }
        }
    }

    private static List<Collection<OperatorStateHandle>> applyRepartitioner(OperatorStateRepartitioner operatorStateRepartitioner, List<OperatorStateHandle> list, int i, int i2) {
        if (list == null) {
            return null;
        }
        if (i2 != i) {
            return operatorStateRepartitioner.repartitionState(list, i2);
        }
        ArrayList arrayList = new ArrayList(i2);
        for (OperatorStateHandle operatorStateHandle : list) {
            Iterator<OperatorStateHandle.StateMetaInfo> it = operatorStateHandle.getStateNameToPartitionOffsets().values().iterator();
            while (it.hasNext()) {
                if (OperatorStateHandle.Mode.BROADCAST.equals(it.next().getDistributionMode())) {
                    return operatorStateRepartitioner.repartitionState(list, i2);
                }
            }
            arrayList.add(Collections.singletonList(operatorStateHandle));
        }
        return arrayList;
    }
}
