package org.apache.flink.streaming.api;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.streaming.api.invokable.SourceInvokable;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.api.streamvertex.CoStreamVertex;
import org.apache.flink.streaming.api.streamvertex.StreamIterationHead;
import org.apache.flink.streaming.api.streamvertex.StreamIterationTail;
import org.apache.flink.streaming.api.streamvertex.StreamVertex;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
import org.apache.flink.streaming.state.OperatorState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/api/JobGraphBuilder.class */
public class JobGraphBuilder {
    private static final Logger LOG = LoggerFactory.getLogger(JobGraphBuilder.class);
    private static final String DEAFULT_JOB_NAME = "Streaming Job";
    private JobGraph jobGraph;
    private Map<String, AbstractJobVertex> streamVertices = new HashMap();
    private Map<String, Integer> vertexParallelism = new HashMap();
    private Map<String, Long> bufferTimeout = new HashMap();
    private Map<String, List<String>> outEdgeList = new HashMap();
    private Map<String, List<Integer>> outEdgeType = new HashMap();
    private Map<String, List<List<String>>> outEdgeNames = new HashMap();
    private Map<String, List<Boolean>> outEdgeSelectAll = new HashMap();
    private Map<String, List<String>> inEdgeList = new HashMap();
    private Map<String, List<StreamPartitioner<?>>> connectionTypes = new HashMap();
    private Map<String, String> operatorNames = new HashMap();
    private Map<String, StreamInvokable<?, ?>> invokableObjects = new HashMap();
    private Map<String, StreamRecordSerializer<?>> typeSerializersIn1 = new HashMap();
    private Map<String, StreamRecordSerializer<?>> typeSerializersIn2 = new HashMap();
    private Map<String, StreamRecordSerializer<?>> typeSerializersOut1 = new HashMap();
    private Map<String, StreamRecordSerializer<?>> typeSerializersOut2 = new HashMap();
    private Map<String, byte[]> outputSelectors = new HashMap();
    private Map<String, Class<? extends AbstractInvokable>> vertexClasses = new HashMap();
    private Map<String, Integer> iterationIds = new HashMap();
    private Map<Integer, String> iterationIDtoHeadName = new HashMap();
    private Map<Integer, String> iterationIDtoTailName = new HashMap();
    private Map<String, Integer> iterationTailCount = new HashMap();
    private Map<String, Long> iterationWaitTime = new HashMap();
    private Map<String, Map<String, OperatorState<?>>> operatorStates = new HashMap();
    private Map<String, InputFormat<String, ?>> inputFormatList = new HashMap();

    public JobGraphBuilder() {
        if (LOG.isDebugEnabled()) {
            LOG.debug("JobGraph created");
        }
    }

    public <IN, OUT> void addStreamVertex(String str, StreamInvokable<IN, OUT> streamInvokable, TypeInformation<IN> typeInformation, TypeInformation<OUT> typeInformation2, String str2, int i) {
        addVertex(str, StreamVertex.class, streamInvokable, str2, i);
        addTypeSerializers(str, typeInformation != null ? new StreamRecordSerializer<>(typeInformation) : null, null, typeInformation2 != null ? new StreamRecordSerializer<>(typeInformation2) : null, null);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Vertex: {}", str);
        }
    }

    public <IN, OUT> void addSourceVertex(String str, SourceFunction<OUT> sourceFunction, TypeInformation<IN> typeInformation, TypeInformation<OUT> typeInformation2, String str2, byte[] bArr, int i) {
        addStreamVertex(str, new SourceInvokable(sourceFunction), typeInformation, typeInformation2, str2, i);
    }

    public void addIterationHead(String str, String str2, Integer num, int i, long j) {
        addVertex(str, StreamIterationHead.class, null, null, i);
        this.iterationIds.put(str, num);
        this.iterationIDtoHeadName.put(num, str);
        setBytesFrom(str2, str);
        setEdge(str, str2, this.connectionTypes.get(this.inEdgeList.get(str2).get(0)).get(0), 0, new ArrayList(), false);
        this.iterationWaitTime.put(this.iterationIDtoHeadName.get(num), Long.valueOf(j));
        if (LOG.isDebugEnabled()) {
            LOG.debug("ITERATION SOURCE: {}", str);
        }
    }

    public void addIterationTail(String str, String str2, Integer num, int i, long j) {
        if (this.bufferTimeout.get(str2).longValue() == 0) {
            throw new RuntimeException("Buffer timeout 0 at iteration tail is not supported.");
        }
        addVertex(str, StreamIterationTail.class, null, null, i);
        this.iterationIds.put(str, num);
        this.iterationIDtoTailName.put(num, str);
        setBytesFrom(str2, str);
        this.iterationWaitTime.put(this.iterationIDtoTailName.get(num), Long.valueOf(j));
        if (LOG.isDebugEnabled()) {
            LOG.debug("ITERATION SINK: {}", str);
        }
    }

    public <IN1, IN2, OUT> void addCoTask(String str, CoInvokable<IN1, IN2, OUT> coInvokable, TypeInformation<IN1> typeInformation, TypeInformation<IN2> typeInformation2, TypeInformation<OUT> typeInformation3, String str2, int i) {
        addVertex(str, CoStreamVertex.class, coInvokable, str2, i);
        addTypeSerializers(str, new StreamRecordSerializer<>(typeInformation), new StreamRecordSerializer<>(typeInformation2), new StreamRecordSerializer<>(typeInformation3), null);
        if (LOG.isDebugEnabled()) {
            LOG.debug("CO-TASK: {}", str);
        }
    }

    private void addVertex(String str, Class<? extends AbstractInvokable> cls, StreamInvokable<?, ?> streamInvokable, String str2, int i) {
        this.vertexClasses.put(str, cls);
        setParallelism(str, i);
        this.invokableObjects.put(str, streamInvokable);
        this.operatorNames.put(str, str2);
        this.outEdgeList.put(str, new ArrayList());
        this.outEdgeType.put(str, new ArrayList());
        this.outEdgeNames.put(str, new ArrayList());
        this.outEdgeSelectAll.put(str, new ArrayList());
        this.inEdgeList.put(str, new ArrayList());
        this.connectionTypes.put(str, new ArrayList());
        this.iterationTailCount.put(str, 0);
    }

    private void addTypeSerializers(String str, StreamRecordSerializer<?> streamRecordSerializer, StreamRecordSerializer<?> streamRecordSerializer2, StreamRecordSerializer<?> streamRecordSerializer3, StreamRecordSerializer<?> streamRecordSerializer4) {
        this.typeSerializersIn1.put(str, streamRecordSerializer);
        this.typeSerializersIn2.put(str, streamRecordSerializer2);
        this.typeSerializersOut1.put(str, streamRecordSerializer3);
        this.typeSerializersOut2.put(str, streamRecordSerializer4);
    }

    private void createVertex(String str) {
        Class<? extends AbstractInvokable> cls = this.vertexClasses.get(str);
        StreamInvokable<?, ?> streamInvokable = this.invokableObjects.get(str);
        int intValue = this.vertexParallelism.get(str).intValue();
        byte[] bArr = this.outputSelectors.get(str);
        Map<String, OperatorState<?>> map = this.operatorStates.get(str);
        AbstractJobVertex abstractJobVertex = new AbstractJobVertex(str);
        this.jobGraph.addVertex(abstractJobVertex);
        abstractJobVertex.setInvokableClass(cls);
        abstractJobVertex.setParallelism(intValue);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Parallelism set: {} for {}", Integer.valueOf(intValue), str);
        }
        StreamConfig streamConfig = new StreamConfig(abstractJobVertex.getConfiguration());
        streamConfig.setBufferTimeout(this.bufferTimeout.get(str).longValue());
        streamConfig.setTypeSerializerIn1(this.typeSerializersIn1.get(str));
        streamConfig.setTypeSerializerIn2(this.typeSerializersIn2.get(str));
        streamConfig.setTypeSerializerOut1(this.typeSerializersOut1.get(str));
        streamConfig.setTypeSerializerOut2(this.typeSerializersOut2.get(str));
        streamConfig.setUserInvokable(streamInvokable);
        streamConfig.setVertexName(str);
        streamConfig.setOutputSelector(bArr);
        streamConfig.setOperatorStates(map);
        if (cls.equals(StreamIterationHead.class) || cls.equals(StreamIterationTail.class)) {
            streamConfig.setIterationId(this.iterationIds.get(str));
            streamConfig.setIterationWaitTime(this.iterationWaitTime.get(str).longValue());
        }
        if (this.inputFormatList.containsKey(str)) {
            abstractJobVertex.setInputSplitSource(this.inputFormatList.get(str));
        }
        this.streamVertices.put(str, abstractJobVertex);
    }

    private <T> void connect(String str, String str2, StreamPartitioner<T> streamPartitioner) {
        AbstractJobVertex abstractJobVertex = this.streamVertices.get(str);
        AbstractJobVertex abstractJobVertex2 = this.streamVertices.get(str2);
        StreamConfig streamConfig = new StreamConfig(abstractJobVertex.getConfiguration());
        if (streamPartitioner.getStrategy() == StreamPartitioner.PartitioningStrategy.FORWARD) {
            abstractJobVertex2.connectNewDataSetAsInput(abstractJobVertex, DistributionPattern.POINTWISE);
        } else {
            abstractJobVertex2.connectNewDataSetAsInput(abstractJobVertex, DistributionPattern.BIPARTITE);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("CONNECTED: {} - {} -> {}", new Object[]{streamPartitioner.getClass().getSimpleName(), str, str2});
        }
        int numberOfProducedIntermediateDataSets = abstractJobVertex.getNumberOfProducedIntermediateDataSets() - 1;
        streamConfig.setOutputName(numberOfProducedIntermediateDataSets, this.outEdgeNames.get(str).get(numberOfProducedIntermediateDataSets));
        streamConfig.setSelectAll(numberOfProducedIntermediateDataSets, this.outEdgeSelectAll.get(str).get(numberOfProducedIntermediateDataSets));
        streamConfig.setPartitioner(numberOfProducedIntermediateDataSets, streamPartitioner);
        streamConfig.setNumberOfOutputChannels(numberOfProducedIntermediateDataSets, this.vertexParallelism.get(str2));
    }

    public void setParallelism(String str, int i) {
        this.vertexParallelism.put(str, Integer.valueOf(i));
    }

    public void setInputFormat(String str, InputFormat<String, ?> inputFormat) {
        this.inputFormatList.put(str, inputFormat);
    }

    public void setBufferTimeout(String str, long j) {
        this.bufferTimeout.put(str, Long.valueOf(j));
    }

    public void addOperatorState(String str, String str2, OperatorState<?> operatorState) {
        Map<String, OperatorState<?>> map = this.operatorStates.get(str);
        if (map == null) {
            map = new HashMap();
            map.put(str2, operatorState);
        } else {
            if (map.containsKey(str2)) {
                throw new RuntimeException("State has already been registered with this name: " + str2);
            }
            map.put(str2, operatorState);
        }
        this.operatorStates.put(str, map);
    }

    public void setEdge(String str, String str2, StreamPartitioner<?> streamPartitioner, int i, List<String> list, boolean z) {
        this.outEdgeList.get(str).add(str2);
        this.outEdgeType.get(str).add(Integer.valueOf(i));
        this.inEdgeList.get(str2).add(str);
        this.connectionTypes.get(str).add(streamPartitioner);
        this.outEdgeNames.get(str).add(list);
        this.outEdgeSelectAll.get(str).add(Boolean.valueOf(z));
    }

    public void setIterationSourceSettings(String str, String str2) {
        setParallelism(this.iterationIDtoHeadName.get(str), this.vertexParallelism.get(str2).intValue());
        setBufferTimeout(this.iterationIDtoHeadName.get(str), this.bufferTimeout.get(str2).longValue());
    }

    public <T> void setOutputSelector(String str, byte[] bArr) {
        this.outputSelectors.put(str, bArr);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Outputselector set for {}", str);
        }
    }

    public <IN, OUT> void setInvokable(String str, StreamInvokable<IN, OUT> streamInvokable) {
        this.invokableObjects.put(str, streamInvokable);
    }

    public StreamInvokable<?, ?> getInvokable(String str) {
        return this.invokableObjects.get(str);
    }

    public <OUT> void setOutType(String str, TypeInformation<OUT> typeInformation) {
        this.typeSerializersOut1.put(str, new StreamRecordSerializer<>(typeInformation));
    }

    public void setBytesFrom(String str, String str2) {
        this.operatorNames.put(str2, this.operatorNames.get(str));
        this.typeSerializersIn1.put(str2, this.typeSerializersOut1.get(str));
        this.typeSerializersIn2.put(str2, this.typeSerializersOut2.get(str));
        this.typeSerializersOut1.put(str2, this.typeSerializersOut1.get(str));
        this.typeSerializersOut2.put(str2, this.typeSerializersOut2.get(str));
    }

    private void setSlotSharing() {
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        Iterator<AbstractJobVertex> it = this.streamVertices.values().iterator();
        while (it.hasNext()) {
            it.next().setSlotSharingGroup(slotSharingGroup);
        }
        Iterator it2 = new HashSet(this.iterationIds.values()).iterator();
        while (it2.hasNext()) {
            Integer num = (Integer) it2.next();
            CoLocationGroup coLocationGroup = new CoLocationGroup();
            AbstractJobVertex abstractJobVertex = this.streamVertices.get(this.iterationIDtoTailName.get(num));
            coLocationGroup.addVertex(this.streamVertices.get(this.iterationIDtoHeadName.get(num)));
            coLocationGroup.addVertex(abstractJobVertex);
        }
    }

    private void setNumberOfJobInputs() {
        for (AbstractJobVertex abstractJobVertex : this.streamVertices.values()) {
            new StreamConfig(abstractJobVertex.getConfiguration()).setNumberOfInputs(abstractJobVertex.getNumberOfInputs());
        }
    }

    private void setNumberOfJobOutputs() {
        for (AbstractJobVertex abstractJobVertex : this.streamVertices.values()) {
            new StreamConfig(abstractJobVertex.getConfiguration()).setNumberOfOutputs(abstractJobVertex.getNumberOfProducedIntermediateDataSets());
        }
    }

    public JobGraph getJobGraph() {
        return getJobGraph(DEAFULT_JOB_NAME);
    }

    public JobGraph getJobGraph(String str) {
        this.jobGraph = new JobGraph(str);
        buildJobGraph();
        return this.jobGraph;
    }

    private void buildJobGraph() {
        Iterator<String> it = this.outEdgeList.keySet().iterator();
        while (it.hasNext()) {
            createVertex(it.next());
        }
        for (String str : this.outEdgeList.keySet()) {
            int i = 0;
            List<Integer> list = this.outEdgeType.get(str);
            for (String str2 : this.outEdgeList.get(str)) {
                StreamConfig streamConfig = new StreamConfig(this.streamVertices.get(str2).getConfiguration());
                int numberOfInputs = streamConfig.getNumberOfInputs();
                streamConfig.setInputType(numberOfInputs, list.get(i));
                streamConfig.setNumberOfInputs(numberOfInputs + 1);
                connect(str, str2, this.connectionTypes.get(str).get(i));
                i++;
            }
        }
        setSlotSharing();
        setNumberOfJobInputs();
        setNumberOfJobOutputs();
    }
}
