package org.apache.flink.streaming.api.graph;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.MissingTypeInfo;
import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.apache.sling.commons.json.JSONException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamGraph.class */
public class StreamGraph extends StreamingPlan {
    private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class);
    private final StreamExecutionEnvironment environemnt;
    private final ExecutionConfig executionConfig;
    private Map<Integer, StreamNode> streamNodes;
    private Set<Integer> sources;
    private Map<Integer, StreamLoop> streamLoops;
    protected Map<Integer, StreamLoop> vertexIDtoLoop;
    private StateHandleProvider<?> stateHandleProvider;
    private String jobName = StreamExecutionEnvironment.DEFAULT_JOB_NAME;
    private boolean checkpointingEnabled = false;
    private long checkpointingInterval = 5000;
    private boolean chaining = true;
    private boolean forceCheckpoint = false;

    /* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamGraph$ResourceStrategy.class */
    public enum ResourceStrategy {
        DEFAULT,
        ISOLATE,
        NEWGROUP
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamGraph$StreamLoop.class */
    public static class StreamLoop {
        private Integer loopID;
        private StreamNode source;
        private StreamNode sink;
        private Long timeout;

        public StreamLoop(Integer num, StreamNode streamNode, Long l) {
            this.loopID = num;
            this.source = streamNode;
            this.timeout = l;
        }

        public Integer getID() {
            return this.loopID;
        }

        public Long getTimeout() {
            return this.timeout;
        }

        public void setSink(StreamNode streamNode) {
            this.sink = streamNode;
        }

        public StreamNode getSource() {
            return this.source;
        }

        public StreamNode getSink() {
            return this.sink;
        }
    }

    public StreamGraph(StreamExecutionEnvironment streamExecutionEnvironment) {
        this.environemnt = streamExecutionEnvironment;
        this.executionConfig = streamExecutionEnvironment.getConfig();
        clear();
    }

    public void clear() {
        this.streamNodes = new HashMap();
        this.streamLoops = new HashMap();
        this.vertexIDtoLoop = new HashMap();
        this.sources = new HashSet();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ExecutionConfig getExecutionConfig() {
        return this.executionConfig;
    }

    public void setJobName(String str) {
        this.jobName = str;
    }

    public void setChaining(boolean z) {
        this.chaining = z;
    }

    public void setCheckpointingEnabled(boolean z) {
        this.checkpointingEnabled = z;
    }

    public void setCheckpointingInterval(long j) {
        this.checkpointingInterval = j;
    }

    public void forceCheckpoint() {
        this.forceCheckpoint = true;
    }

    public void setStateHandleProvider(StateHandleProvider<?> stateHandleProvider) {
        this.stateHandleProvider = stateHandleProvider;
    }

    public StateHandleProvider<?> getStateHandleProvider() {
        return this.stateHandleProvider;
    }

    public long getCheckpointingInterval() {
        return this.checkpointingInterval;
    }

    public boolean isChainingEnabled() {
        return this.chaining;
    }

    public boolean isCheckpointingEnabled() {
        return this.checkpointingEnabled;
    }

    public boolean isIterative() {
        return !this.streamLoops.isEmpty();
    }

    public <IN, OUT> void addSource(Integer num, StreamOperator<OUT> streamOperator, TypeInformation<IN> typeInformation, TypeInformation<OUT> typeInformation2, String str) {
        addOperator(num, streamOperator, typeInformation, typeInformation2, str);
        this.sources.add(num);
    }

    public <IN, OUT> void addOperator(Integer num, StreamOperator<OUT> streamOperator, TypeInformation<IN> typeInformation, TypeInformation<OUT> typeInformation2, String str) {
        if (streamOperator instanceof StreamSource) {
            addNode(num, SourceStreamTask.class, streamOperator, str);
        } else {
            addNode(num, OneInputStreamTask.class, streamOperator, str);
        }
        setSerializers(num, typeInformation != null ? new StreamRecordSerializer<>(typeInformation, this.executionConfig) : null, null, (typeInformation2 == null || (typeInformation2 instanceof MissingTypeInfo)) ? null : new StreamRecordSerializer<>(typeInformation2, this.executionConfig));
        if (LOG.isDebugEnabled()) {
            LOG.debug("Vertex: {}", num);
        }
    }

    public <IN1, IN2, OUT> void addCoOperator(Integer num, TwoInputStreamOperator<IN1, IN2, OUT> twoInputStreamOperator, TypeInformation<IN1> typeInformation, TypeInformation<IN2> typeInformation2, TypeInformation<OUT> typeInformation3, String str) {
        addNode(num, TwoInputStreamTask.class, twoInputStreamOperator, str);
        setSerializers(num, new StreamRecordSerializer<>(typeInformation, this.executionConfig), new StreamRecordSerializer<>(typeInformation2, this.executionConfig), (typeInformation3 == null || (typeInformation3 instanceof MissingTypeInfo)) ? null : new StreamRecordSerializer<>(typeInformation3, this.executionConfig));
        if (LOG.isDebugEnabled()) {
            LOG.debug("CO-TASK: {}", num);
        }
    }

    public void addIterationHead(Integer num, Integer num2, Integer num3, long j) {
        StreamNode addNode = addNode(num, StreamIterationHead.class, null, null);
        StreamLoop streamLoop = new StreamLoop(num3, getStreamNode(num), Long.valueOf(j));
        this.streamLoops.put(num3, streamLoop);
        this.vertexIDtoLoop.put(num, streamLoop);
        setSerializersFrom(num2, num);
        addNode.setOperatorName("IterationSource-" + num);
        addNode.setParallelism(Integer.valueOf(getStreamNode(num2).getParallelism()));
        addEdge(num, num2, new RebalancePartitioner(true), 0, new ArrayList());
        if (LOG.isDebugEnabled()) {
            LOG.debug("ITERATION SOURCE: {}", num);
        }
        this.sources.add(num);
    }

    public void addIterationTail(Integer num, Integer num2, Integer num3, long j) {
        if (getStreamNode(num2).getBufferTimeout().longValue() == 0) {
            throw new RuntimeException("Buffer timeout 0 at iteration tail is not supported.");
        }
        StreamNode addNode = addNode(num, StreamIterationTail.class, null, null);
        StreamLoop streamLoop = this.streamLoops.get(num3);
        streamLoop.setSink(getStreamNode(num));
        this.vertexIDtoLoop.put(num, streamLoop);
        addNode.setParallelism(Integer.valueOf(streamLoop.getSource().getParallelism()));
        setSerializersFrom(num2, num);
        getStreamNode(num).setOperatorName("IterationSink-" + num);
        setBufferTimeout(streamLoop.getSource().getId(), getStreamNode(num2).getBufferTimeout().longValue());
        if (LOG.isDebugEnabled()) {
            LOG.debug("ITERATION SINK: {}", num);
        }
    }

    protected StreamNode addNode(Integer num, Class<? extends AbstractInvokable> cls, StreamOperator<?> streamOperator, String str) {
        StreamNode streamNode = new StreamNode(this.environemnt, num, streamOperator, str, new ArrayList(), cls);
        this.streamNodes.put(num, streamNode);
        return streamNode;
    }

    public void addEdge(Integer num, Integer num2, StreamPartitioner<?> streamPartitioner, int i, List<String> list) {
        StreamEdge streamEdge = new StreamEdge(getStreamNode(num), getStreamNode(num2), i, list, streamPartitioner);
        getStreamNode(Integer.valueOf(streamEdge.getSourceId())).addOutEdge(streamEdge);
        getStreamNode(Integer.valueOf(streamEdge.getTargetId())).addInEdge(streamEdge);
    }

    public <T> void addOutputSelector(Integer num, OutputSelector<T> outputSelector) {
        getStreamNode(num).addOutputSelector(outputSelector);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Outputselector set for {}", num);
        }
    }

    public void setParallelism(Integer num, int i) {
        getStreamNode(num).setParallelism(Integer.valueOf(i));
    }

    public void setBufferTimeout(Integer num, long j) {
        getStreamNode(num).setBufferTimeout(Long.valueOf(j));
    }

    private void setSerializers(Integer num, StreamRecordSerializer<?> streamRecordSerializer, StreamRecordSerializer<?> streamRecordSerializer2, StreamRecordSerializer<?> streamRecordSerializer3) {
        StreamNode streamNode = getStreamNode(num);
        streamNode.setSerializerIn1(streamRecordSerializer);
        streamNode.setSerializerIn2(streamRecordSerializer2);
        streamNode.setSerializerOut(streamRecordSerializer3);
    }

    private void setSerializersFrom(Integer num, Integer num2) {
        StreamNode streamNode = getStreamNode(num);
        StreamNode streamNode2 = getStreamNode(num2);
        streamNode2.setSerializerIn1(streamNode.getTypeSerializerOut());
        streamNode2.setSerializerOut(streamNode.getTypeSerializerIn1());
    }

    public <OUT> void setOutType(Integer num, TypeInformation<OUT> typeInformation) {
        getStreamNode(num).setSerializerOut(new StreamRecordSerializer<>(typeInformation, this.executionConfig));
    }

    public <IN, OUT> void setOperator(Integer num, StreamOperator<OUT> streamOperator) {
        getStreamNode(num).setOperator(streamOperator);
    }

    public void setInputFormat(Integer num, InputFormat<?, ?> inputFormat) {
        getStreamNode(num).setInputFormat(inputFormat);
    }

    public void setResourceStrategy(Integer num, ResourceStrategy resourceStrategy) {
        StreamNode streamNode = getStreamNode(num);
        switch (resourceStrategy) {
            case ISOLATE:
                streamNode.isolateSlot();
                return;
            case NEWGROUP:
                streamNode.startNewSlotSharingGroup();
                return;
            default:
                throw new IllegalArgumentException("Unknown resource strategy");
        }
    }

    public StreamNode getStreamNode(Integer num) {
        return this.streamNodes.get(num);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Collection<? extends Integer> getVertexIDs() {
        return this.streamNodes.keySet();
    }

    public StreamEdge getStreamEdge(int i, int i2) {
        for (StreamEdge streamEdge : getStreamNode(Integer.valueOf(i)).getOutEdges()) {
            if (streamEdge.getTargetId() == i2) {
                return streamEdge;
            }
        }
        throw new RuntimeException("No such edge in stream graph: " + i + " -> " + i2);
    }

    public Collection<Integer> getSourceIDs() {
        return this.sources;
    }

    public Collection<StreamNode> getStreamNodes() {
        return this.streamNodes.values();
    }

    public Set<Tuple2<Integer, StreamOperator<?>>> getOperators() {
        HashSet hashSet = new HashSet();
        for (StreamNode streamNode : this.streamNodes.values()) {
            hashSet.add(new Tuple2(streamNode.getId(), streamNode.getOperator()));
        }
        return hashSet;
    }

    public Collection<StreamLoop> getStreamLoops() {
        return this.streamLoops.values();
    }

    public Integer getLoopID(Integer num) {
        return this.vertexIDtoLoop.get(num).getID();
    }

    public long getLoopTimeout(Integer num) {
        return this.vertexIDtoLoop.get(num).getTimeout().longValue();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void removeEdge(StreamEdge streamEdge) {
        streamEdge.getSourceVertex().getOutEdges().remove(streamEdge);
        streamEdge.getTargetVertex().getInEdges().remove(streamEdge);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void removeVertex(StreamNode streamNode) {
        HashSet hashSet = new HashSet();
        hashSet.addAll(streamNode.getInEdges());
        hashSet.addAll(streamNode.getOutEdges());
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            removeEdge((StreamEdge) it.next());
        }
        this.streamNodes.remove(streamNode.getId());
    }

    @Override // org.apache.flink.optimizer.plan.StreamingPlan
    public JobGraph getJobGraph() {
        return getJobGraph(this.jobName);
    }

    public JobGraph getJobGraph(String str) {
        if (isIterative() && isCheckpointingEnabled() && !this.forceCheckpoint) {
            throw new UnsupportedOperationException("Checkpointing is currently not supported by default for iterative jobs, as we cannot guarantee exactly once semantics. State checkpoints happen normally, but records in-transit during the snapshot will be lost upon failure. \nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)");
        }
        setJobName(str);
        WindowingOptimizer.optimizeGraph(this);
        return new StreamingJobGraphGenerator(this).createJobGraph(str);
    }

    @Override // org.apache.flink.optimizer.plan.StreamingPlan
    public String getStreamingPlanAsJSON() {
        WindowingOptimizer.optimizeGraph(this);
        try {
            return new JSONGenerator(this).getJSON();
        } catch (JSONException e) {
            if (!LOG.isDebugEnabled()) {
                return "";
            }
            LOG.debug("JSON plan creation failed: {}", (Throwable) e);
            return "";
        }
    }

    @Override // org.apache.flink.optimizer.plan.StreamingPlan
    public void dumpStreamingPlanAsJSON(File file) throws IOException {
        PrintWriter printWriter = null;
        try {
            printWriter = new PrintWriter((OutputStream) new FileOutputStream(file), false);
            printWriter.write(getStreamingPlanAsJSON());
            printWriter.flush();
            if (printWriter != null) {
                printWriter.close();
            }
        } catch (Throwable th) {
            if (printWriter != null) {
                printWriter.close();
            }
            throw th;
        }
    }
}
