package org.apache.flink.streaming.api;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.MissingTypeInfo;
import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapperFactory;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.api.streamvertex.CoStreamVertex;
import org.apache.flink.streaming.api.streamvertex.StreamIterationHead;
import org.apache.flink.streaming.api.streamvertex.StreamIterationTail;
import org.apache.flink.streaming.api.streamvertex.StreamVertex;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
import org.apache.sling.commons.json.JSONArray;
import org.apache.sling.commons.json.JSONException;
import org.apache.sling.commons.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/api/StreamGraph.class */
public class StreamGraph extends StreamingPlan {
    private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class);
    private static final String DEAFULT_JOB_NAME = "Flink Streaming Job";
    private Map<Integer, Integer> operatorParallelisms;
    private Map<Integer, Long> bufferTimeouts;
    private StreamEdgeList edges;
    private Map<Integer, List<OutputSelector<?>>> outputSelectors;
    private Map<Integer, String> operatorNames;
    private Map<Integer, StreamInvokable<?, ?>> invokableObjects;
    private Map<Integer, StreamRecordSerializer<?>> typeSerializersIn1;
    private Map<Integer, StreamRecordSerializer<?>> typeSerializersIn2;
    private Map<Integer, StreamRecordSerializer<?>> typeSerializersOut1;
    private Map<Integer, StreamRecordSerializer<?>> typeSerializersOut2;
    private Map<Integer, Class<? extends AbstractInvokable>> jobVertexClasses;
    private Map<Integer, Integer> iterationIds;
    private Map<Integer, Integer> iterationIDtoHeadID;
    private Map<Integer, Integer> iterationIDtoTailID;
    private Map<Integer, Integer> iterationTailCount;
    private Map<Integer, Long> iterationTimeouts;
    private Map<Integer, InputFormat<String, ?>> inputFormatLists;
    private List<Map<Integer, ?>> containingMaps;
    private Set<Integer> sources;
    private ExecutionConfig executionConfig;
    private boolean checkpointingEnabled;
    protected boolean chaining = true;
    private String jobName = DEAFULT_JOB_NAME;
    private long checkpointingInterval = 5000;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/StreamGraph$JSONGenerator.class */
    public class JSONGenerator {
        public static final String STEPS = "step_function";
        public static final String ID = "id";
        public static final String SIDE = "side";
        public static final String SHIP_STRATEGY = "ship_strategy";
        public static final String PREDECESSORS = "predecessors";
        public static final String TYPE = "type";
        public static final String PACT = "pact";
        public static final String CONTENTS = "contents";
        public static final String PARALLELISM = "parallelism";

        private JSONGenerator() {
        }

        public String getJSON() throws JSONException {
            JSONObject jSONObject = new JSONObject();
            JSONArray jSONArray = new JSONArray();
            jSONObject.put("nodes", jSONArray);
            ArrayList arrayList = new ArrayList(StreamGraph.this.operatorNames.keySet());
            Collections.sort(arrayList);
            visit(jSONArray, arrayList, new HashMap());
            return jSONObject.toString();
        }

        private void visit(JSONArray jSONArray, List<Integer> list, Map<Integer, Integer> map) throws JSONException {
            Integer num = list.get(0);
            if (StreamGraph.this.getSources().contains(num) || Collections.disjoint(StreamGraph.this.getInEdges(num), list)) {
                JSONObject jSONObject = new JSONObject();
                decorateNode(num, jSONObject);
                if (!StreamGraph.this.getSources().contains(num)) {
                    JSONArray jSONArray2 = new JSONArray();
                    jSONObject.put(PREDECESSORS, jSONArray2);
                    Iterator<StreamEdge> it = StreamGraph.this.getInEdges(num).iterator();
                    while (it.hasNext()) {
                        int sourceVertex = it.next().getSourceVertex();
                        decorateEdge(jSONArray2, num.intValue(), Integer.valueOf(map.keySet().contains(Integer.valueOf(sourceVertex)) ? map.get(Integer.valueOf(sourceVertex)).intValue() : sourceVertex).intValue(), sourceVertex);
                    }
                }
                jSONArray.put(jSONObject);
                list.remove(num);
            } else {
                Integer num2 = -1;
                Iterator<StreamEdge> it2 = StreamGraph.this.getInEdges(num).iterator();
                while (it2.hasNext()) {
                    int sourceVertex2 = it2.next().getSourceVertex();
                    if (StreamGraph.this.iterationIds.keySet().contains(Integer.valueOf(sourceVertex2))) {
                        num2 = Integer.valueOf(sourceVertex2);
                    }
                }
                JSONObject jSONObject2 = new JSONObject();
                JSONArray jSONArray3 = new JSONArray();
                jSONObject2.put(STEPS, jSONArray3);
                jSONObject2.put(ID, num2);
                jSONObject2.put(PACT, "IterativeDataStream");
                jSONObject2.put(PARALLELISM, StreamGraph.this.getParallelism(num2));
                jSONObject2.put(CONTENTS, "Stream Iteration");
                JSONArray jSONArray4 = new JSONArray();
                jSONObject2.put(PREDECESSORS, jSONArray4);
                list.remove(num2);
                visitIteration(jSONArray3, list, num2.intValue(), map, jSONArray4);
                jSONArray.put(jSONObject2);
            }
            if (list.isEmpty()) {
                return;
            }
            visit(jSONArray, list, map);
        }

        private void visitIteration(JSONArray jSONArray, List<Integer> list, int i, Map<Integer, Integer> map, JSONArray jSONArray2) throws JSONException {
            Integer num = list.get(0);
            list.remove(num);
            if (StreamGraph.this.iterationIds.containsKey(num)) {
                return;
            }
            JSONObject jSONObject = new JSONObject();
            jSONArray.put(jSONObject);
            decorateNode(num, jSONObject);
            JSONArray jSONArray3 = new JSONArray();
            jSONObject.put(PREDECESSORS, jSONArray3);
            Iterator<StreamEdge> it = StreamGraph.this.getInEdges(num).iterator();
            while (it.hasNext()) {
                int sourceVertex = it.next().getSourceVertex();
                if (map.keySet().contains(Integer.valueOf(sourceVertex))) {
                    decorateEdge(jSONArray3, num.intValue(), sourceVertex, sourceVertex);
                } else if (!StreamGraph.this.iterationIds.containsKey(Integer.valueOf(sourceVertex))) {
                    decorateEdge(jSONArray2, num.intValue(), sourceVertex, sourceVertex);
                }
            }
            map.put(num, Integer.valueOf(i));
            visitIteration(jSONArray, list, i, map, jSONArray2);
        }

        private void decorateEdge(JSONArray jSONArray, int i, int i2, int i3) throws JSONException {
            JSONObject jSONObject = new JSONObject();
            jSONArray.put(jSONObject);
            jSONObject.put(ID, i2);
            jSONObject.put(SHIP_STRATEGY, StreamGraph.this.edges.getEdge(i3, i).getPartitioner().getStrategy());
            jSONObject.put(SIDE, jSONArray.length() == 0 ? "first" : "second");
        }

        private void decorateNode(Integer num, JSONObject jSONObject) throws JSONException {
            jSONObject.put(ID, num);
            jSONObject.put(TYPE, StreamGraph.this.getOperatorName(num));
            if (StreamGraph.this.sources.contains(num)) {
                jSONObject.put(PACT, "Data Source");
            } else {
                jSONObject.put(PACT, "Data Stream");
            }
            if (StreamGraph.this.getInvokable(num) == null || StreamGraph.this.getInvokable(num).getUserFunction() == null) {
                jSONObject.put(CONTENTS, StreamGraph.this.getOperatorName(num));
            } else {
                jSONObject.put(CONTENTS, StreamGraph.this.getOperatorName(num) + " at " + StreamGraph.this.getInvokable(num).getUserFunction().getClass().getSimpleName());
            }
            jSONObject.put(PARALLELISM, StreamGraph.this.getParallelism(num));
        }
    }

    public StreamGraph(ExecutionConfig executionConfig) {
        this.executionConfig = executionConfig;
        initGraph();
        if (LOG.isDebugEnabled()) {
            LOG.debug("StreamGraph created");
        }
    }

    public void initGraph() {
        this.containingMaps = new ArrayList();
        this.operatorParallelisms = new HashMap();
        this.containingMaps.add(this.operatorParallelisms);
        this.bufferTimeouts = new HashMap();
        this.edges = new StreamEdgeList();
        this.operatorNames = new HashMap();
        this.containingMaps.add(this.operatorNames);
        this.invokableObjects = new HashMap();
        this.containingMaps.add(this.invokableObjects);
        this.typeSerializersIn1 = new HashMap();
        this.containingMaps.add(this.typeSerializersIn1);
        this.typeSerializersIn2 = new HashMap();
        this.containingMaps.add(this.typeSerializersIn2);
        this.typeSerializersOut1 = new HashMap();
        this.containingMaps.add(this.typeSerializersOut1);
        this.typeSerializersOut2 = new HashMap();
        this.containingMaps.add(this.typeSerializersOut1);
        this.outputSelectors = new HashMap();
        this.containingMaps.add(this.outputSelectors);
        this.jobVertexClasses = new HashMap();
        this.containingMaps.add(this.jobVertexClasses);
        this.iterationIds = new HashMap();
        this.containingMaps.add(this.jobVertexClasses);
        this.iterationIDtoHeadID = new HashMap();
        this.iterationIDtoTailID = new HashMap();
        this.iterationTailCount = new HashMap();
        this.containingMaps.add(this.iterationTailCount);
        this.iterationTimeouts = new HashMap();
        this.containingMaps.add(this.iterationTailCount);
        this.inputFormatLists = new HashMap();
        this.containingMaps.add(this.inputFormatLists);
        this.sources = new HashSet();
    }

    public <IN, OUT> void addStreamVertex(Integer num, StreamInvokable<IN, OUT> streamInvokable, TypeInformation<IN> typeInformation, TypeInformation<OUT> typeInformation2, String str, int i) {
        addVertex(num, StreamVertex.class, streamInvokable, str, i);
        addTypeSerializers(num, typeInformation != null ? new StreamRecordSerializer<>(typeInformation, this.executionConfig) : null, null, (typeInformation2 == null || (typeInformation2 instanceof MissingTypeInfo)) ? null : new StreamRecordSerializer<>(typeInformation2, this.executionConfig), null);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Vertex: {}", num);
        }
    }

    public <IN, OUT> void addSourceVertex(Integer num, StreamInvokable<IN, OUT> streamInvokable, TypeInformation<IN> typeInformation, TypeInformation<OUT> typeInformation2, String str, int i) {
        addStreamVertex(num, streamInvokable, typeInformation, typeInformation2, str, i);
        this.sources.add(num);
    }

    public void addIterationHead(Integer num, Integer num2, Integer num3, int i, long j) {
        addVertex(num, StreamIterationHead.class, null, null, i);
        this.chaining = false;
        this.iterationIds.put(num, num3);
        this.iterationIDtoHeadID.put(num3, num);
        setSerializersFrom(num2, num);
        setEdge(num, num2, this.edges.getOutEdges(this.edges.getInEdgeIndices(num2.intValue()).get(0).intValue()).get(0).getPartitioner(), 0, new ArrayList());
        this.iterationTimeouts.put(this.iterationIDtoHeadID.get(num3), Long.valueOf(j));
        if (LOG.isDebugEnabled()) {
            LOG.debug("ITERATION SOURCE: {}", num);
        }
        this.sources.add(num);
    }

    public void addIterationTail(Integer num, Integer num2, Integer num3, long j) {
        if (this.bufferTimeouts.get(num2).longValue() == 0) {
            throw new RuntimeException("Buffer timeout 0 at iteration tail is not supported.");
        }
        addVertex(num, StreamIterationTail.class, null, null, getParallelism(num2));
        this.iterationIds.put(num, num3);
        this.iterationIDtoTailID.put(num3, num);
        setSerializersFrom(num2, num);
        this.iterationTimeouts.put(this.iterationIDtoTailID.get(num3), Long.valueOf(j));
        setParallelism(this.iterationIDtoHeadID.get(num3), getParallelism(num2));
        setBufferTimeout(this.iterationIDtoHeadID.get(num3), this.bufferTimeouts.get(num2).longValue());
        if (LOG.isDebugEnabled()) {
            LOG.debug("ITERATION SINK: {}", num);
        }
    }

    public <IN1, IN2, OUT> void addCoTask(Integer num, CoInvokable<IN1, IN2, OUT> coInvokable, TypeInformation<IN1> typeInformation, TypeInformation<IN2> typeInformation2, TypeInformation<OUT> typeInformation3, String str, int i) {
        addVertex(num, CoStreamVertex.class, coInvokable, str, i);
        addTypeSerializers(num, new StreamRecordSerializer<>(typeInformation, this.executionConfig), new StreamRecordSerializer<>(typeInformation2, this.executionConfig), (typeInformation3 == null || (typeInformation3 instanceof MissingTypeInfo)) ? null : new StreamRecordSerializer<>(typeInformation3, this.executionConfig), null);
        if (LOG.isDebugEnabled()) {
            LOG.debug("CO-TASK: {}", num);
        }
    }

    private void addVertex(Integer num, Class<? extends AbstractInvokable> cls, StreamInvokable<?, ?> streamInvokable, String str, int i) {
        this.jobVertexClasses.put(num, cls);
        setParallelism(num, i);
        this.invokableObjects.put(num, streamInvokable);
        this.operatorNames.put(num, str);
        this.edges.addVertex(num.intValue());
        this.outputSelectors.put(num, new ArrayList());
        this.iterationTailCount.put(num, 0);
    }

    public void setEdge(Integer num, Integer num2, StreamPartitioner<?> streamPartitioner, int i, List<String> list) {
        this.edges.addEdge(new StreamEdge(num.intValue(), num2.intValue(), i, list, streamPartitioner));
    }

    public void removeEdge(Integer num, Integer num2) {
        this.edges.removeEdge(num.intValue(), num2.intValue());
    }

    public void removeVertex(Integer num) {
        this.edges.removeVertex(num.intValue());
        Iterator<Map<Integer, ?>> it = this.containingMaps.iterator();
        while (it.hasNext()) {
            it.next().remove(num);
        }
    }

    private void addTypeSerializers(Integer num, StreamRecordSerializer<?> streamRecordSerializer, StreamRecordSerializer<?> streamRecordSerializer2, StreamRecordSerializer<?> streamRecordSerializer3, StreamRecordSerializer<?> streamRecordSerializer4) {
        this.typeSerializersIn1.put(num, streamRecordSerializer);
        this.typeSerializersIn2.put(num, streamRecordSerializer2);
        this.typeSerializersOut1.put(num, streamRecordSerializer3);
        this.typeSerializersOut2.put(num, streamRecordSerializer4);
    }

    public void setParallelism(Integer num, int i) {
        this.operatorParallelisms.put(num, Integer.valueOf(i));
    }

    public int getParallelism(Integer num) {
        return this.operatorParallelisms.get(num).intValue();
    }

    public void setInputFormat(Integer num, InputFormat<String, ?> inputFormat) {
        this.inputFormatLists.put(num, inputFormat);
    }

    public void setBufferTimeout(Integer num, long j) {
        this.bufferTimeouts.put(num, Long.valueOf(j));
    }

    public long getBufferTimeout(Integer num) {
        return this.bufferTimeouts.get(num).longValue();
    }

    public <T> void setOutputSelector(Integer num, OutputSelector<T> outputSelector) {
        this.outputSelectors.get(num).add(outputSelector);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Outputselector set for {}", num);
        }
    }

    public <IN, OUT> void setInvokable(Integer num, StreamInvokable<IN, OUT> streamInvokable) {
        this.invokableObjects.put(num, streamInvokable);
    }

    public <OUT> void setOutType(Integer num, TypeInformation<OUT> typeInformation) {
        this.typeSerializersOut1.put(num, new StreamRecordSerializer<>(typeInformation, this.executionConfig));
    }

    public StreamInvokable<?, ?> getInvokable(Integer num) {
        return this.invokableObjects.get(num);
    }

    public <OUT> StreamRecordSerializer<OUT> getOutSerializer1(Integer num) {
        return (StreamRecordSerializer) this.typeSerializersOut1.get(num);
    }

    public <OUT> StreamRecordSerializer<OUT> getOutSerializer2(Integer num) {
        return (StreamRecordSerializer) this.typeSerializersOut2.get(num);
    }

    public <IN> StreamRecordSerializer<IN> getInSerializer1(Integer num) {
        return (StreamRecordSerializer) this.typeSerializersIn1.get(num);
    }

    public <IN> StreamRecordSerializer<IN> getInSerializer2(Integer num) {
        return (StreamRecordSerializer) this.typeSerializersIn2.get(num);
    }

    public void setSerializersFrom(Integer num, Integer num2) {
        this.operatorNames.put(num2, this.operatorNames.get(num));
        this.typeSerializersIn1.put(num2, this.typeSerializersOut1.get(num));
        this.typeSerializersIn2.put(num2, this.typeSerializersOut2.get(num));
        this.typeSerializersOut1.put(num2, this.typeSerializersOut1.get(num));
        this.typeSerializersOut2.put(num2, this.typeSerializersOut2.get(num));
    }

    public JobGraph getJobGraph() {
        return getJobGraph(this.jobName);
    }

    public JobGraph getJobGraph(String str) {
        if (isIterative() && isCheckpointingEnabled()) {
            throw new UnsupportedOperationException("Checkpointing is currently not supported for iterative jobs!");
        }
        this.jobName = str;
        WindowingOptimizer.optimizeGraph(this);
        return new StreamingJobGraphGenerator(this).createJobGraph(str);
    }

    public void setJobName(String str) {
        this.jobName = str;
    }

    public void setChaining(boolean z) {
        this.chaining = z;
    }

    public Set<Map.Entry<Integer, StreamInvokable<?, ?>>> getInvokables() {
        return this.invokableObjects.entrySet();
    }

    public Collection<Integer> getSources() {
        return this.sources;
    }

    public StreamEdge getEdge(Integer num, Integer num2) {
        return this.edges.getEdge(num.intValue(), num2.intValue());
    }

    public List<StreamEdge> getOutEdges(Integer num) {
        return this.edges.getOutEdges(num.intValue());
    }

    public List<StreamEdge> getInEdges(Integer num) {
        return this.edges.getInEdges(num.intValue());
    }

    public List<Integer> getOutEdgeIndices(Integer num) {
        return this.edges.getOutEdgeIndices(num.intValue());
    }

    public List<Integer> getInEdgeIndices(Integer num) {
        return this.edges.getInEdgeIndices(num.intValue());
    }

    public Collection<Integer> getIterationIDs() {
        return new HashSet(this.iterationIds.values());
    }

    public Integer getIterationTail(int i) {
        return this.iterationIDtoTailID.get(Integer.valueOf(i));
    }

    public Integer getIterationHead(int i) {
        return this.iterationIDtoHeadID.get(Integer.valueOf(i));
    }

    public Class<? extends AbstractInvokable> getJobVertexClass(Integer num) {
        return this.jobVertexClasses.get(num);
    }

    public InputFormat<String, ?> getInputFormat(Integer num) {
        return this.inputFormatLists.get(num);
    }

    public OutputSelectorWrapper<?> getOutputSelectorWrapper(Integer num) {
        return OutputSelectorWrapperFactory.create(this.outputSelectors.get(num));
    }

    public Integer getIterationID(Integer num) {
        return this.iterationIds.get(num);
    }

    public long getIterationTimeout(Integer num) {
        return this.iterationTimeouts.get(num).longValue();
    }

    public boolean isIterative() {
        return !this.iterationIds.isEmpty();
    }

    public String getOperatorName(Integer num) {
        return this.operatorNames.get(num);
    }

    public ExecutionConfig getExecutionConfig() {
        return this.executionConfig;
    }

    public void setCheckpointingEnabled(boolean z) {
        this.checkpointingEnabled = z;
    }

    public boolean isCheckpointingEnabled() {
        return this.checkpointingEnabled;
    }

    public void setCheckpointingInterval(long j) {
        this.checkpointingInterval = j;
    }

    public long getCheckpointingInterval() {
        return this.checkpointingInterval;
    }

    public String getStreamingPlanAsJSON() {
        WindowingOptimizer.optimizeGraph(this);
        try {
            return new JSONGenerator().getJSON();
        } catch (JSONException e) {
            if (!LOG.isDebugEnabled()) {
                return "";
            }
            LOG.debug("JSON plan creation failed: {}", e);
            return "";
        }
    }

    public void dumpStreamingPlanAsJSON(File file) throws IOException {
        PrintWriter printWriter = null;
        try {
            printWriter = new PrintWriter((OutputStream) new FileOutputStream(file), false);
            printWriter.write(getStreamingPlanAsJSON());
            printWriter.flush();
            if (printWriter != null) {
                printWriter.close();
            }
        } catch (Throwable th) {
            if (printWriter != null) {
                printWriter.close();
            }
            throw th;
        }
    }
}
