package org.apache.flink.streaming.api;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.streamvertex.StreamIterationHead;
import org.apache.flink.streaming.api.streamvertex.StreamIterationTail;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/api/StreamingJobGraphGenerator.class */
public class StreamingJobGraphGenerator {
    private static final Logger LOG = LoggerFactory.getLogger(StreamingJobGraphGenerator.class);
    private StreamGraph streamGraph;
    private Map<Integer, AbstractJobVertex> streamVertices;
    private JobGraph jobGraph;
    private Collection<Integer> builtVertices;
    private List<StreamEdge> physicalEdgesInOrder;
    private Map<Integer, Map<Integer, StreamConfig>> chainedConfigs;
    private Map<Integer, StreamConfig> vertexConfigs;
    private Map<Integer, String> chainedNames;

    public StreamingJobGraphGenerator(StreamGraph streamGraph) {
        this.streamGraph = streamGraph;
    }

    private void init() {
        this.streamVertices = new HashMap();
        this.builtVertices = new HashSet();
        this.chainedConfigs = new HashMap();
        this.vertexConfigs = new HashMap();
        this.chainedNames = new HashMap();
        this.physicalEdgesInOrder = new ArrayList();
    }

    public JobGraph createJobGraph(String str) {
        this.jobGraph = new JobGraph(str);
        this.jobGraph.setScheduleMode(ScheduleMode.ALL);
        this.jobGraph.setJobType(JobGraph.JobType.STREAMING);
        this.jobGraph.setCheckpointingEnabled(this.streamGraph.isCheckpointingEnabled());
        this.jobGraph.setCheckpointingInterval(this.streamGraph.getCheckpointingInterval());
        if (this.jobGraph.isCheckpointingEnabled()) {
            int numberOfExecutionRetries = this.streamGraph.getExecutionConfig().getNumberOfExecutionRetries();
            if (numberOfExecutionRetries != -1) {
                this.jobGraph.setNumberOfExecutionRetries(numberOfExecutionRetries);
            } else {
                this.jobGraph.setNumberOfExecutionRetries(Integer.MAX_VALUE);
            }
        }
        init();
        setChaining();
        setPhysicalEdges();
        setSlotSharing();
        return this.jobGraph;
    }

    private void setPhysicalEdges() {
        HashMap hashMap = new HashMap();
        for (StreamEdge streamEdge : this.physicalEdgesInOrder) {
            int targetVertex = streamEdge.getTargetVertex();
            List list = (List) hashMap.get(Integer.valueOf(targetVertex));
            if (list == null) {
                list = new ArrayList();
                hashMap.put(Integer.valueOf(targetVertex), list);
            }
            list.add(streamEdge);
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            int intValue = ((Integer) entry.getKey()).intValue();
            this.vertexConfigs.get(Integer.valueOf(intValue)).setInPhysicalEdges((List) entry.getValue());
        }
    }

    private void setChaining() {
        for (Integer num : this.streamGraph.getSources()) {
            createChain(num, num);
        }
    }

    private List<StreamEdge> createChain(Integer num, Integer num2) {
        if (this.builtVertices.contains(num)) {
            return new ArrayList();
        }
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        for (StreamEdge streamEdge : this.streamGraph.getOutEdges(num2)) {
            if (isChainable(streamEdge)) {
                arrayList2.add(streamEdge);
            } else {
                arrayList3.add(streamEdge);
            }
        }
        Iterator<StreamEdge> it = arrayList2.iterator();
        while (it.hasNext()) {
            arrayList.addAll(createChain(num, Integer.valueOf(it.next().getTargetVertex())));
        }
        for (StreamEdge streamEdge2 : arrayList3) {
            arrayList.add(streamEdge2);
            createChain(Integer.valueOf(streamEdge2.getTargetVertex()), Integer.valueOf(streamEdge2.getTargetVertex()));
        }
        this.chainedNames.put(num2, createChainedName(num2, arrayList2));
        StreamConfig createProcessingVertex = num2.equals(num) ? createProcessingVertex(num) : new StreamConfig(new Configuration());
        setVertexConfig(num2, createProcessingVertex, arrayList2, arrayList3);
        if (num2.equals(num)) {
            createProcessingVertex.setChainStart();
            createProcessingVertex.setOutEdgesInOrder(arrayList);
            createProcessingVertex.setOutEdges(this.streamGraph.getOutEdges(num2));
            Iterator<StreamEdge> it2 = arrayList.iterator();
            while (it2.hasNext()) {
                connect(num, it2.next());
            }
            createProcessingVertex.setTransitiveChainedTaskConfigs(this.chainedConfigs.get(num));
        } else {
            if (this.chainedConfigs.get(num) == null) {
                this.chainedConfigs.put(num, new HashMap());
            }
            this.chainedConfigs.get(num).put(num2, createProcessingVertex);
        }
        return arrayList;
    }

    private String createChainedName(Integer num, List<StreamEdge> list) {
        String operatorName = this.streamGraph.getOperatorName(num);
        if (list.size() <= 1) {
            return list.size() == 1 ? operatorName + " -> " + this.chainedNames.get(Integer.valueOf(list.get(0).getTargetVertex())) : operatorName;
        }
        ArrayList arrayList = new ArrayList();
        Iterator<StreamEdge> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(this.chainedNames.get(Integer.valueOf(it.next().getTargetVertex())));
        }
        return operatorName + " -> (" + StringUtils.join(arrayList, ", ") + ")";
    }

    private StreamConfig createProcessingVertex(Integer num) {
        AbstractJobVertex abstractJobVertex = new AbstractJobVertex(this.chainedNames.get(num));
        abstractJobVertex.setInvokableClass(this.streamGraph.getJobVertexClass(num));
        if (this.streamGraph.getParallelism(num) > 0) {
            abstractJobVertex.setParallelism(this.streamGraph.getParallelism(num));
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Parallelism set: {} for {}", Integer.valueOf(this.streamGraph.getParallelism(num)), num);
        }
        if (this.streamGraph.getInputFormat(num) != null) {
            abstractJobVertex.setInputSplitSource(this.streamGraph.getInputFormat(num));
        }
        this.streamVertices.put(num, abstractJobVertex);
        this.builtVertices.add(num);
        this.jobGraph.addVertex(abstractJobVertex);
        StreamConfig streamConfig = new StreamConfig(abstractJobVertex.getConfiguration());
        streamConfig.setOperatorName(this.chainedNames.get(num));
        return streamConfig;
    }

    private void setVertexConfig(Integer num, StreamConfig streamConfig, List<StreamEdge> list, List<StreamEdge> list2) {
        streamConfig.setVertexID(num);
        streamConfig.setBufferTimeout(this.streamGraph.getBufferTimeout(num));
        streamConfig.setTypeSerializerIn1(this.streamGraph.getInSerializer1(num));
        streamConfig.setTypeSerializerIn2(this.streamGraph.getInSerializer2(num));
        streamConfig.setTypeSerializerOut1(this.streamGraph.getOutSerializer1(num));
        streamConfig.setTypeSerializerOut2(this.streamGraph.getOutSerializer2(num));
        streamConfig.setUserInvokable(this.streamGraph.getInvokable(num));
        streamConfig.setOutputSelectorWrapper(this.streamGraph.getOutputSelectorWrapper(num));
        streamConfig.setNumberOfOutputs(list2.size());
        streamConfig.setNonChainedOutputs(list2);
        streamConfig.setChainedOutputs(list);
        streamConfig.setStateMonitoring(this.streamGraph.isCheckpointingEnabled());
        Class<? extends AbstractInvokable> jobVertexClass = this.streamGraph.getJobVertexClass(num);
        if (jobVertexClass.equals(StreamIterationHead.class) || jobVertexClass.equals(StreamIterationTail.class)) {
            streamConfig.setIterationId(this.streamGraph.getIterationID(num));
            streamConfig.setIterationWaitTime(this.streamGraph.getIterationTimeout(num));
        }
        ArrayList<StreamEdge> arrayList = new ArrayList(list);
        arrayList.addAll(list2);
        for (StreamEdge streamEdge : arrayList) {
            streamConfig.setSelectedNames(Integer.valueOf(streamEdge.getTargetVertex()), this.streamGraph.getEdge(num, Integer.valueOf(streamEdge.getTargetVertex())).getSelectedNames());
        }
        this.vertexConfigs.put(num, streamConfig);
    }

    private void connect(Integer num, StreamEdge streamEdge) {
        this.physicalEdgesInOrder.add(streamEdge);
        Integer valueOf = Integer.valueOf(streamEdge.getTargetVertex());
        AbstractJobVertex abstractJobVertex = this.streamVertices.get(num);
        AbstractJobVertex abstractJobVertex2 = this.streamVertices.get(valueOf);
        StreamConfig streamConfig = new StreamConfig(abstractJobVertex2.getConfiguration());
        streamConfig.setNumberOfInputs(streamConfig.getNumberOfInputs() + 1);
        StreamPartitioner<?> partitioner = streamEdge.getPartitioner();
        if (partitioner.getStrategy() == StreamPartitioner.PartitioningStrategy.FORWARD) {
            abstractJobVertex2.connectNewDataSetAsInput(abstractJobVertex, DistributionPattern.POINTWISE);
        } else {
            abstractJobVertex2.connectNewDataSetAsInput(abstractJobVertex, DistributionPattern.ALL_TO_ALL);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("CONNECTED: {} - {} -> {}", new Object[]{partitioner.getClass().getSimpleName(), num, valueOf});
        }
    }

    private boolean isChainable(StreamEdge streamEdge) {
        int sourceVertex = streamEdge.getSourceVertex();
        int targetVertex = streamEdge.getTargetVertex();
        StreamInvokable<?, ?> invokable = this.streamGraph.getInvokable(Integer.valueOf(sourceVertex));
        StreamInvokable<?, ?> invokable2 = this.streamGraph.getInvokable(Integer.valueOf(targetVertex));
        return this.streamGraph.getInEdges(Integer.valueOf(targetVertex)).size() == 1 && invokable2 != null && invokable2.getChainingStrategy() == StreamInvokable.ChainingStrategy.ALWAYS && (invokable.getChainingStrategy() == StreamInvokable.ChainingStrategy.HEAD || invokable.getChainingStrategy() == StreamInvokable.ChainingStrategy.ALWAYS) && ((streamEdge.getPartitioner().getStrategy() == StreamPartitioner.PartitioningStrategy.FORWARD || this.streamGraph.getParallelism(Integer.valueOf(targetVertex)) == 1) && this.streamGraph.getParallelism(Integer.valueOf(sourceVertex)) == this.streamGraph.getParallelism(Integer.valueOf(targetVertex)) && this.streamGraph.chaining);
    }

    private void setSlotSharing() {
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        Iterator<AbstractJobVertex> it = this.streamVertices.values().iterator();
        while (it.hasNext()) {
            it.next().setSlotSharingGroup(slotSharingGroup);
        }
        for (Integer num : this.streamGraph.getIterationIDs()) {
            CoLocationGroup coLocationGroup = new CoLocationGroup();
            AbstractJobVertex abstractJobVertex = this.streamVertices.get(this.streamGraph.getIterationTail(num.intValue()));
            coLocationGroup.addVertex(this.streamVertices.get(this.streamGraph.getIterationHead(num.intValue())));
            coLocationGroup.addVertex(abstractJobVertex);
        }
    }
}
