package org.apache.flink.runtime.executiongraph;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.io.StrictlyLocalAssignment;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.core.io.LocatableInputSplit;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.util.SerializableObject;
import org.slf4j.Logger;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionJobVertex.class */
public class ExecutionJobVertex implements Serializable {
    private static final long serialVersionUID = 42;
    private static final Logger LOG = ExecutionGraph.LOG;
    private final SerializableObject stateMonitor;
    private final ExecutionGraph graph;
    private final JobVertex jobVertex;
    private final ExecutionVertex[] taskVertices;
    private IntermediateResult[] producedDataSets;
    private final List<IntermediateResult> inputs;
    private final int parallelism;
    private final boolean[] finishedSubtasks;
    private volatile int numSubtasksInFinalState;
    private final SlotSharingGroup slotSharingGroup;
    private final CoLocationGroup coLocationGroup;
    private final InputSplit[] inputSplits;
    private List<LocatableInputSplit>[] inputSplitsPerSubtask;
    private InputSplitAssigner splitAssigner;

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionJobVertex$PredeterminedInputSplitAssigner.class */
    public static class PredeterminedInputSplitAssigner implements InputSplitAssigner {
        private List<LocatableInputSplit>[] inputSplitsPerSubtask;

        public PredeterminedInputSplitAssigner(List<LocatableInputSplit>[] listArr) {
            this.inputSplitsPerSubtask = new List[listArr.length];
            for (int i = 0; i < listArr.length; i++) {
                List<LocatableInputSplit> list = listArr[i];
                this.inputSplitsPerSubtask[i] = (list == null || list.isEmpty()) ? Collections.emptyList() : new ArrayList<>(listArr[i]);
            }
        }

        @Override // org.apache.flink.core.io.InputSplitAssigner
        public InputSplit getNextInputSplit(String str, int i) {
            if (this.inputSplitsPerSubtask[i].isEmpty()) {
                return null;
            }
            return this.inputSplitsPerSubtask[i].remove(this.inputSplitsPerSubtask[i].size() - 1);
        }
    }

    public ExecutionJobVertex(ExecutionGraph executionGraph, JobVertex jobVertex, int i, FiniteDuration finiteDuration) throws JobException {
        this(executionGraph, jobVertex, i, finiteDuration, System.currentTimeMillis());
    }

    /* JADX WARN: Type inference failed for: r1v26, types: [org.apache.flink.core.io.InputSplit[]] */
    public ExecutionJobVertex(ExecutionGraph executionGraph, JobVertex jobVertex, int i, FiniteDuration finiteDuration, long j) throws JobException {
        this.stateMonitor = new SerializableObject();
        if (executionGraph == null || jobVertex == null) {
            throw new NullPointerException();
        }
        this.graph = executionGraph;
        this.jobVertex = jobVertex;
        int parallelism = jobVertex.getParallelism();
        int i2 = parallelism > 0 ? parallelism : i;
        this.parallelism = i2;
        this.taskVertices = new ExecutionVertex[i2];
        this.inputs = new ArrayList(jobVertex.getInputs().size());
        this.slotSharingGroup = jobVertex.getSlotSharingGroup();
        this.coLocationGroup = jobVertex.getCoLocationGroup();
        if (this.coLocationGroup != null && this.slotSharingGroup == null) {
            throw new JobException("Vertex uses a co-location constraint without using slot sharing");
        }
        this.producedDataSets = new IntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()];
        for (int i3 = 0; i3 < jobVertex.getProducedDataSets().size(); i3++) {
            IntermediateDataSet intermediateDataSet = jobVertex.getProducedDataSets().get(i3);
            this.producedDataSets[i3] = new IntermediateResult(intermediateDataSet.getId(), this, i2, intermediateDataSet.getResultType());
        }
        for (int i4 = 0; i4 < i2; i4++) {
            this.taskVertices[i4] = new ExecutionVertex(this, i4, this.producedDataSets, finiteDuration, j);
        }
        for (IntermediateResult intermediateResult : this.producedDataSets) {
            if (intermediateResult.getNumberOfAssignedPartitions() != this.parallelism) {
                throw new RuntimeException("The intermediate result's partitions were not correctly assigned.");
            }
        }
        try {
            InputSplitSource<?> inputSplitSource = jobVertex.getInputSplitSource();
            if (inputSplitSource != null) {
                this.inputSplits = inputSplitSource.createInputSplits(i2);
                if (this.inputSplits != null) {
                    if (inputSplitSource instanceof StrictlyLocalAssignment) {
                        this.inputSplitsPerSubtask = computeLocalInputSplitsPerTask(this.inputSplits);
                        this.splitAssigner = new PredeterminedInputSplitAssigner(this.inputSplitsPerSubtask);
                    } else {
                        this.splitAssigner = inputSplitSource.getInputSplitAssigner(this.inputSplits);
                    }
                }
            } else {
                this.inputSplits = null;
            }
            this.finishedSubtasks = new boolean[this.parallelism];
        } catch (Throwable th) {
            throw new JobException("Creating the input splits caused an error: " + th.getMessage(), th);
        }
    }

    public ExecutionGraph getGraph() {
        return this.graph;
    }

    public JobVertex getJobVertex() {
        return this.jobVertex;
    }

    public int getParallelism() {
        return this.parallelism;
    }

    public JobID getJobId() {
        return this.graph.getJobID();
    }

    public JobVertexID getJobVertexId() {
        return this.jobVertex.getID();
    }

    public ExecutionVertex[] getTaskVertices() {
        return this.taskVertices;
    }

    public IntermediateResult[] getProducedDataSets() {
        return this.producedDataSets;
    }

    public InputSplitAssigner getSplitAssigner() {
        return this.splitAssigner;
    }

    public SlotSharingGroup getSlotSharingGroup() {
        return this.slotSharingGroup;
    }

    public CoLocationGroup getCoLocationGroup() {
        return this.coLocationGroup;
    }

    public List<IntermediateResult> getInputs() {
        return this.inputs;
    }

    public boolean isInFinalState() {
        return this.numSubtasksInFinalState == this.parallelism;
    }

    public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> map) throws JobException {
        List<JobEdge> inputs = this.jobVertex.getInputs();
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("Connecting ExecutionJobVertex %s (%s) to %d predecessors.", this.jobVertex.getID(), this.jobVertex.getName(), Integer.valueOf(inputs.size())));
        }
        for (int i = 0; i < inputs.size(); i++) {
            JobEdge jobEdge = inputs.get(i);
            if (LOG.isDebugEnabled()) {
                if (jobEdge.getSource() == null) {
                    LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via ID %s.", Integer.valueOf(i), this.jobVertex.getID(), this.jobVertex.getName(), jobEdge.getSourceId()));
                } else {
                    LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via predecessor %s (%s).", Integer.valueOf(i), this.jobVertex.getID(), this.jobVertex.getName(), jobEdge.getSource().getProducer().getID(), jobEdge.getSource().getProducer().getName()));
                }
            }
            IntermediateResult intermediateResult = map.get(jobEdge.getSourceId());
            if (intermediateResult == null) {
                throw new JobException("Cannot connect this job graph to the previous graph. No previous intermediate result found for ID " + jobEdge.getSourceId());
            }
            this.inputs.add(intermediateResult);
            int registerConsumer = intermediateResult.registerConsumer();
            for (int i2 = 0; i2 < this.parallelism; i2++) {
                this.taskVertices[i2].connectSource(i, intermediateResult, jobEdge, registerConsumer);
            }
        }
    }

    public void scheduleAll(Scheduler scheduler, boolean z) throws NoResourceAvailableException {
        String[] hostnames;
        ExecutionVertex[] executionVertexArr = this.taskVertices;
        if (this.inputSplitsPerSubtask != null) {
            Map<String, List<Instance>> instancesByHost = scheduler.getInstancesByHost();
            HashMap hashMap = new HashMap();
            for (int i = 0; i < executionVertexArr.length; i++) {
                List<LocatableInputSplit> list = this.inputSplitsPerSubtask[i];
                if (list != null && !list.isEmpty() && (hostnames = list.get(0).getHostnames()) != null && hostnames.length != 0 && hostnames[0] != null) {
                    String str = hostnames[0];
                    ExecutionVertex executionVertex = executionVertexArr[i];
                    List<Instance> list2 = instancesByHost.get(str);
                    if (list2 == null || list2.isEmpty()) {
                        throw new NoResourceAvailableException("Cannot schedule a strictly local task to host " + str + ". No TaskManager available on that host.");
                    }
                    Integer num = (Integer) hashMap.get(str);
                    if (num == null) {
                        num = 0;
                        hashMap.put(str, 0);
                    } else {
                        hashMap.put(str, Integer.valueOf(num.intValue() + (1 % list2.size())));
                    }
                    executionVertex.setLocationConstraintHosts(Collections.singletonList(list2.get(num.intValue())));
                }
            }
        }
        for (ExecutionVertex executionVertex2 : executionVertexArr) {
            executionVertex2.scheduleForExecution(scheduler, z);
        }
    }

    public void cancel() {
        for (ExecutionVertex executionVertex : getTaskVertices()) {
            executionVertex.cancel();
        }
    }

    public void fail(Throwable th) {
        for (ExecutionVertex executionVertex : getTaskVertices()) {
            executionVertex.fail(th);
        }
    }

    public void waitForAllVerticesToReachFinishingState() throws InterruptedException {
        synchronized (this.stateMonitor) {
            while (this.numSubtasksInFinalState < this.parallelism) {
                this.stateMonitor.wait();
            }
        }
    }

    public void resetForNewExecution() {
        if (this.numSubtasksInFinalState != 0 && this.numSubtasksInFinalState != this.parallelism) {
            throw new IllegalStateException("Cannot reset vertex that is not in final state");
        }
        synchronized (this.stateMonitor) {
            if (this.slotSharingGroup != null) {
                this.slotSharingGroup.clearTaskAssignment();
            }
            if (this.coLocationGroup != null) {
                this.coLocationGroup.resetConstraints();
            }
            for (int i = 0; i < this.parallelism; i++) {
                this.taskVertices[i].resetForNewExecution();
                if (this.finishedSubtasks[i]) {
                    this.finishedSubtasks[i] = false;
                    this.numSubtasksInFinalState--;
                }
            }
            if (this.numSubtasksInFinalState != 0) {
                throw new RuntimeException("Bug: resetting the execution job vertex failed.");
            }
            try {
                if (this.inputSplits != null && this.inputSplitsPerSubtask == null) {
                    this.splitAssigner = this.jobVertex.getInputSplitSource().getInputSplitAssigner(this.inputSplits);
                }
                for (IntermediateResult intermediateResult : this.producedDataSets) {
                    intermediateResult.resetForNewExecution();
                }
            } catch (Throwable th) {
                throw new RuntimeException("Re-creating the input split assigner failed: " + th.getMessage(), th);
            }
        }
    }

    public void prepareForArchiving() {
        for (ExecutionVertex executionVertex : this.taskVertices) {
            executionVertex.prepareForArchiving();
        }
        this.inputs.clear();
        this.producedDataSets = null;
        if (this.slotSharingGroup != null) {
            this.slotSharingGroup.clearTaskAssignment();
        }
        if (this.coLocationGroup != null) {
            this.coLocationGroup.resetConstraints();
        }
        this.splitAssigner = null;
        if (this.inputSplits != null) {
            for (int i = 0; i < this.inputSplits.length; i++) {
                this.inputSplits[i] = null;
            }
        }
        this.inputSplitsPerSubtask = null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void vertexFinished(int i) {
        subtaskInFinalState(i);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void vertexCancelled(int i) {
        subtaskInFinalState(i);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void vertexFailed(int i, Throwable th) {
        subtaskInFinalState(i);
    }

    private void subtaskInFinalState(int i) {
        synchronized (this.stateMonitor) {
            if (!this.finishedSubtasks[i]) {
                this.finishedSubtasks[i] = true;
                if (this.numSubtasksInFinalState + 1 == this.parallelism) {
                    try {
                        getJobVertex().finalizeOnMaster(getGraph().getUserClassLoader());
                    } catch (Throwable th) {
                        getGraph().fail(th);
                    }
                    this.numSubtasksInFinalState++;
                    this.stateMonitor.notifyAll();
                    this.graph.jobVertexInFinalState(this);
                } else {
                    this.numSubtasksInFinalState++;
                }
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r25v0, types: [java.util.List] */
    /* JADX WARN: Type inference failed for: r25v1 */
    /* JADX WARN: Type inference failed for: r25v2 */
    private List<LocatableInputSplit>[] computeLocalInputSplitsPerTask(InputSplit[] inputSplitArr) throws JobException {
        List<LocatableInputSplit> arrayList;
        int parallelism = getParallelism();
        if (parallelism > inputSplitArr.length) {
            throw new JobException("Strictly local assignment requires at least as many splits as subtasks.");
        }
        HashMap hashMap = new HashMap();
        for (InputSplit inputSplit : inputSplitArr) {
            if (!(inputSplit instanceof LocatableInputSplit)) {
                throw new JobException("Invalid InputSplit type " + inputSplit.getClass().getCanonicalName() + ". Strictly local assignment requires LocatableInputSplit");
            }
            LocatableInputSplit locatableInputSplit = (LocatableInputSplit) inputSplit;
            if (locatableInputSplit.getHostnames() == null) {
                throw new JobException("LocatableInputSplit has no host information. Strictly local assignment requires exactly one hostname for each LocatableInputSplit.");
            }
            if (locatableInputSplit.getHostnames().length != 1) {
                throw new JobException("Strictly local assignment requires exactly one hostname for each LocatableInputSplit.");
            }
            String str = locatableInputSplit.getHostnames()[0];
            if (str == null) {
                throw new JobException("For strictly local input split assignment, no null host names are allowed.");
            }
            List list = (List) hashMap.get(str);
            if (list == null) {
                list = new ArrayList();
                hashMap.put(str, list);
            }
            list.add(locatableInputSplit);
        }
        int size = hashMap.size();
        if (parallelism < size) {
            throw new JobException("Strictly local split assignment requires at least as many parallel subtasks as distinct split hosts. Please increase the parallelism of DataSource " + getJobVertex().getName() + " to at least " + size + ".");
        }
        ArrayList arrayList2 = new ArrayList(hashMap.keySet());
        Collections.sort(arrayList2);
        List<LocatableInputSplit>[] listArr = new List[parallelism];
        int i = parallelism / size;
        int i2 = parallelism % size;
        int i3 = 0;
        int i4 = 0;
        while (i4 < size) {
            List list2 = (List) hashMap.get((String) arrayList2.get(i4));
            int size2 = list2.size();
            int min = Math.min(size2, i4 < i2 ? i + 1 : i);
            int i5 = size2 / min;
            int i6 = size2 % min;
            int i7 = 0;
            int i8 = 0;
            while (i8 < min) {
                int i9 = i8 < i6 ? i5 + 1 : i5;
                if (i9 == size2) {
                    arrayList = list2;
                } else {
                    arrayList = new ArrayList(i9);
                    for (int i10 = 0; i10 < i9; i10++) {
                        int i11 = i7;
                        i7++;
                        arrayList.add(list2.get(i11));
                    }
                }
                int i12 = i3;
                i3++;
                listArr[i12] = arrayList;
                i8++;
            }
            i4++;
        }
        return listArr;
    }
}
