/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.graph;

import java.nio.charset.Charset;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.InputFormatVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.shaded.com.google.common.hash.HashFunction;
import org.apache.flink.shaded.com.google.common.hash.Hasher;
import org.apache.flink.shaded.com.google.common.hash.Hashing;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class StreamingJobGraphGenerator {
    private static final Logger LOG = LoggerFactory.getLogger(StreamingJobGraphGenerator.class);
    private static final long DEFAULT_RESTART_DELAY = 10000L;
    private StreamGraph streamGraph;
    private Map<Integer, JobVertex> jobVertices;
    private JobGraph jobGraph;
    private Collection<Integer> builtVertices;
    private List<StreamEdge> physicalEdgesInOrder;
    private Map<Integer, Map<Integer, StreamConfig>> chainedConfigs;
    private Map<Integer, StreamConfig> vertexConfigs;
    private Map<Integer, String> chainedNames;

    public StreamingJobGraphGenerator(StreamGraph streamGraph) {
        this.streamGraph = streamGraph;
    }

    private void init() {
        this.jobVertices = new HashMap<Integer, JobVertex>();
        this.builtVertices = new HashSet<Integer>();
        this.chainedConfigs = new HashMap<Integer, Map<Integer, StreamConfig>>();
        this.vertexConfigs = new HashMap<Integer, StreamConfig>();
        this.chainedNames = new HashMap<Integer, String>();
        this.physicalEdgesInOrder = new ArrayList<StreamEdge>();
    }

    public JobGraph createJobGraph() {
        this.jobGraph = new JobGraph(this.streamGraph.getJobName());
        this.jobGraph.setScheduleMode(ScheduleMode.ALL);
        this.init();
        Map<Integer, byte[]> hashes = this.traverseStreamGraphAndGenerateHashes();
        this.setChaining(hashes);
        this.setPhysicalEdges();
        this.setSlotSharing();
        this.configureCheckpointing();
        this.jobGraph.setExecutionConfig(this.streamGraph.getExecutionConfig());
        return this.jobGraph;
    }

    private void setPhysicalEdges() {
        HashMap<Integer, ArrayList<StreamEdge>> physicalInEdgesInOrder = new HashMap<Integer, ArrayList<StreamEdge>>();
        for (StreamEdge streamEdge : this.physicalEdgesInOrder) {
            int target = streamEdge.getTargetId();
            ArrayList<StreamEdge> inEdges = (ArrayList<StreamEdge>)physicalInEdgesInOrder.get(target);
            if (inEdges == null) {
                inEdges = new ArrayList<StreamEdge>();
                physicalInEdgesInOrder.put(target, inEdges);
            }
            inEdges.add(streamEdge);
        }
        for (Map.Entry entry : physicalInEdgesInOrder.entrySet()) {
            int vertex = (Integer)entry.getKey();
            List edgeList = (List)entry.getValue();
            this.vertexConfigs.get(vertex).setInPhysicalEdges(edgeList);
        }
    }

    private void setChaining(Map<Integer, byte[]> hashes) {
        for (Integer sourceNodeId : this.streamGraph.getSourceIDs()) {
            this.createChain(sourceNodeId, sourceNodeId, hashes, 0);
        }
    }

    private List<StreamEdge> createChain(Integer startNodeId, Integer currentNodeId, Map<Integer, byte[]> hashes, int chainIndex) {
        if (!this.builtVertices.contains(startNodeId)) {
            ArrayList<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
            ArrayList<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
            ArrayList<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
            for (StreamEdge streamEdge : this.streamGraph.getStreamNode(currentNodeId).getOutEdges()) {
                if (this.isChainable(streamEdge)) {
                    chainableOutputs.add(streamEdge);
                    continue;
                }
                nonChainableOutputs.add(streamEdge);
            }
            for (StreamEdge streamEdge : chainableOutputs) {
                transitiveOutEdges.addAll(this.createChain(startNodeId, streamEdge.getTargetId(), hashes, chainIndex + 1));
            }
            for (StreamEdge streamEdge : nonChainableOutputs) {
                transitiveOutEdges.add(streamEdge);
                this.createChain(streamEdge.getTargetId(), streamEdge.getTargetId(), hashes, 0);
            }
            this.chainedNames.put(currentNodeId, this.createChainedName(currentNodeId, chainableOutputs));
            StreamConfig config = currentNodeId.equals(startNodeId) ? this.createJobVertex(startNodeId, hashes) : new StreamConfig(new Configuration());
            this.setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
            if (currentNodeId.equals(startNodeId)) {
                config.setChainStart();
                config.setChainIndex(0);
                config.setOutEdgesInOrder(transitiveOutEdges);
                config.setOutEdges(this.streamGraph.getStreamNode(currentNodeId).getOutEdges());
                for (StreamEdge edge : transitiveOutEdges) {
                    this.connect(startNodeId, edge);
                }
                config.setTransitiveChainedTaskConfigs(this.chainedConfigs.get(startNodeId));
            } else {
                Map<Integer, StreamConfig> map = this.chainedConfigs.get(startNodeId);
                if (map == null) {
                    this.chainedConfigs.put(startNodeId, new HashMap());
                }
                config.setChainIndex(chainIndex);
                this.chainedConfigs.get(startNodeId).put(currentNodeId, config);
            }
            return transitiveOutEdges;
        }
        return new ArrayList<StreamEdge>();
    }

    private String createChainedName(Integer vertexID, List<StreamEdge> chainedOutputs) {
        String operatorName = this.streamGraph.getStreamNode(vertexID).getOperatorName();
        if (chainedOutputs.size() > 1) {
            ArrayList<String> outputChainedNames = new ArrayList<String>();
            for (StreamEdge chainable : chainedOutputs) {
                outputChainedNames.add(this.chainedNames.get(chainable.getTargetId()));
            }
            return operatorName + " -> (" + org.apache.commons.lang3.StringUtils.join(outputChainedNames, (String)", ") + ")";
        }
        if (chainedOutputs.size() == 1) {
            return operatorName + " -> " + this.chainedNames.get(chainedOutputs.get(0).getTargetId());
        }
        return operatorName;
    }

    private StreamConfig createJobVertex(Integer streamNodeId, Map<Integer, byte[]> hashes) {
        InputFormatVertex jobVertex;
        StreamNode streamNode = this.streamGraph.getStreamNode(streamNodeId);
        byte[] hash = hashes.get(streamNodeId);
        if (hash == null) {
            throw new IllegalStateException("Cannot find node hash. Did you generate them before calling this method?");
        }
        JobVertexID jobVertexId = new JobVertexID(hash);
        if (streamNode.getInputFormat() != null) {
            jobVertex = new InputFormatVertex(this.chainedNames.get(streamNodeId), jobVertexId);
            TaskConfig taskConfig = new TaskConfig(jobVertex.getConfiguration());
            taskConfig.setStubWrapper((UserCodeWrapper)new UserCodeObjectWrapper(streamNode.getInputFormat()));
        } else {
            jobVertex = new JobVertex(this.chainedNames.get(streamNodeId), jobVertexId);
        }
        jobVertex.setInvokableClass(streamNode.getJobVertexClass());
        int parallelism = streamNode.getParallelism();
        if (parallelism > 0) {
            jobVertex.setParallelism(parallelism);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Parallelism set: {} for {}", (Object)parallelism, (Object)streamNodeId);
        }
        this.jobVertices.put(streamNodeId, (JobVertex)jobVertex);
        this.builtVertices.add(streamNodeId);
        this.jobGraph.addVertex((JobVertex)jobVertex);
        return new StreamConfig(jobVertex.getConfiguration());
    }

    private void setVertexConfig(Integer vertexID, StreamConfig config, List<StreamEdge> chainableOutputs, List<StreamEdge> nonChainableOutputs) {
        StreamNode vertex = this.streamGraph.getStreamNode(vertexID);
        config.setVertexID(vertexID);
        config.setBufferTimeout(vertex.getBufferTimeout());
        config.setTypeSerializerIn1(vertex.getTypeSerializerIn1());
        config.setTypeSerializerIn2(vertex.getTypeSerializerIn2());
        config.setTypeSerializerOut(vertex.getTypeSerializerOut());
        config.setStreamOperator(vertex.getOperator());
        config.setOutputSelectors(vertex.getOutputSelectors());
        config.setNumberOfOutputs(nonChainableOutputs.size());
        config.setNonChainedOutputs(nonChainableOutputs);
        config.setChainedOutputs(chainableOutputs);
        config.setTimeCharacteristic(this.streamGraph.getEnvironment().getStreamTimeCharacteristic());
        CheckpointConfig ceckpointCfg = this.streamGraph.getCheckpointConfig();
        config.setStateBackend(this.streamGraph.getStateBackend());
        config.setCheckpointingEnabled(ceckpointCfg.isCheckpointingEnabled());
        if (ceckpointCfg.isCheckpointingEnabled()) {
            config.setCheckpointMode(ceckpointCfg.getCheckpointingMode());
        } else {
            config.setCheckpointMode(CheckpointingMode.AT_LEAST_ONCE);
        }
        config.setStatePartitioner(0, vertex.getStatePartitioner1());
        config.setStatePartitioner(1, vertex.getStatePartitioner2());
        config.setStateKeySerializer(vertex.getStateKeySerializer());
        Class<? extends AbstractInvokable> vertexClass = vertex.getJobVertexClass();
        if (vertexClass.equals(StreamIterationHead.class) || vertexClass.equals(StreamIterationTail.class)) {
            config.setIterationId(this.streamGraph.getBrokerID(vertexID));
            config.setIterationWaitTime(this.streamGraph.getLoopTimeout(vertexID));
        }
        ArrayList<StreamEdge> allOutputs = new ArrayList<StreamEdge>(chainableOutputs);
        allOutputs.addAll(nonChainableOutputs);
        this.vertexConfigs.put(vertexID, config);
    }

    private void connect(Integer headOfChain, StreamEdge edge) {
        this.physicalEdgesInOrder.add(edge);
        Integer downStreamvertexID = edge.getTargetId();
        JobVertex headVertex = this.jobVertices.get(headOfChain);
        JobVertex downStreamVertex = this.jobVertices.get(downStreamvertexID);
        StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration());
        downStreamConfig.setNumberOfInputs(downStreamConfig.getNumberOfInputs() + 1);
        StreamPartitioner<?> partitioner = edge.getPartitioner();
        if (partitioner instanceof ForwardPartitioner) {
            downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED, true);
        } else if (partitioner instanceof RescalePartitioner) {
            downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED, true);
        } else {
            downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED, true);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("CONNECTED: {} - {} -> {}", new Object[]{partitioner.getClass().getSimpleName(), headOfChain, downStreamvertexID});
        }
    }

    private boolean isChainable(StreamEdge edge) {
        StreamNode upStreamVertex = edge.getSourceVertex();
        StreamNode downStreamVertex = edge.getTargetVertex();
        StreamOperator<?> headOperator = upStreamVertex.getOperator();
        StreamOperator<?> outOperator = downStreamVertex.getOperator();
        return downStreamVertex.getInEdges().size() == 1 && outOperator != null && headOperator != null && upStreamVertex.isSameSlotSharingGroup(downStreamVertex) && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD || headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS) && edge.getPartitioner() instanceof ForwardPartitioner && upStreamVertex.getParallelism() == downStreamVertex.getParallelism() && this.streamGraph.isChainingEnabled();
    }

    private void setSlotSharing() {
        HashMap<String, SlotSharingGroup> slotSharingGroups = new HashMap<String, SlotSharingGroup>();
        for (Map.Entry<Integer, JobVertex> entry : this.jobVertices.entrySet()) {
            String slotSharingGroup = this.streamGraph.getStreamNode(entry.getKey()).getSlotSharingGroup();
            SlotSharingGroup group = (SlotSharingGroup)slotSharingGroups.get(slotSharingGroup);
            if (group == null) {
                group = new SlotSharingGroup();
                slotSharingGroups.put(slotSharingGroup, group);
            }
            entry.getValue().setSlotSharingGroup(group);
        }
        for (Tuple2 tuple2 : this.streamGraph.getIterationSourceSinkPairs()) {
            CoLocationGroup ccg = new CoLocationGroup();
            JobVertex source = this.jobVertices.get(((StreamNode)tuple2.f0).getId());
            JobVertex sink = this.jobVertices.get(((StreamNode)tuple2.f1).getId());
            ccg.addVertex(source);
            ccg.addVertex(sink);
            source.updateCoLocationGroup(ccg);
            sink.updateCoLocationGroup(ccg);
        }
    }

    private void configureCheckpointing() {
        CheckpointConfig cfg = this.streamGraph.getCheckpointConfig();
        if (cfg.isCheckpointingEnabled()) {
            long interval = cfg.getCheckpointInterval();
            if (interval < 1L) {
                throw new IllegalArgumentException("The checkpoint interval must be positive");
            }
            ArrayList<JobVertexID> triggerVertices = new ArrayList<JobVertexID>();
            ArrayList<JobVertexID> ackVertices = new ArrayList<JobVertexID>(this.jobVertices.size());
            ArrayList<JobVertexID> commitVertices = new ArrayList<JobVertexID>();
            for (JobVertex vertex : this.jobVertices.values()) {
                if (vertex.isInputVertex()) {
                    triggerVertices.add(vertex.getID());
                }
                commitVertices.add(vertex.getID());
                ackVertices.add(vertex.getID());
            }
            JobSnapshottingSettings settings = new JobSnapshottingSettings(triggerVertices, ackVertices, commitVertices, interval, cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(), cfg.getMaxConcurrentCheckpoints());
            this.jobGraph.setSnapshotSettings(settings);
            if (this.streamGraph.getExecutionConfig().getRestartStrategy() == null) {
                this.streamGraph.getExecutionConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart((int)Integer.MAX_VALUE, (long)10000L));
            }
        }
    }

    private Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes() {
        StreamNode currentNode;
        HashFunction hashFunction = Hashing.murmur3_128(0);
        HashMap<Integer, byte[]> hashes = new HashMap<Integer, byte[]>();
        HashSet<Integer> visited = new HashSet<Integer>();
        ArrayDeque<StreamNode> remaining = new ArrayDeque<StreamNode>();
        ArrayList<Integer> sources = new ArrayList<Integer>();
        for (Integer sourceNodeId : this.streamGraph.getSourceIDs()) {
            sources.add(sourceNodeId);
        }
        Collections.sort(sources);
        for (Integer sourceNodeId : sources) {
            remaining.add(this.streamGraph.getStreamNode(sourceNodeId));
            visited.add(sourceNodeId);
        }
        while ((currentNode = (StreamNode)remaining.poll()) != null) {
            if (this.generateNodeHash(currentNode, hashFunction, hashes)) {
                for (StreamEdge outEdge : currentNode.getOutEdges()) {
                    StreamNode child = outEdge.getTargetVertex();
                    if (visited.contains(child.getId())) continue;
                    remaining.add(child);
                    visited.add(child.getId());
                }
                continue;
            }
            visited.remove(currentNode.getId());
        }
        return hashes;
    }

    private boolean generateNodeHash(StreamNode node, HashFunction hashFunction, Map<Integer, byte[]> hashes) {
        String userSpecifiedHash = node.getTransformationId();
        if (userSpecifiedHash == null) {
            for (StreamEdge inEdge : node.getInEdges()) {
                if (hashes.containsKey(inEdge.getSourceId())) continue;
                return false;
            }
            Hasher hasher = hashFunction.newHasher();
            byte[] hash = this.generateDeterministicHash(node, hasher, hashes);
            if (hashes.put(node.getId(), hash) != null) {
                throw new IllegalStateException("Unexpected state. Tried to add node hash twice. This is probably a bug in the JobGraph generator.");
            }
            return true;
        }
        for (StreamEdge inEdge : node.getInEdges()) {
            if (!this.isChainable(inEdge)) continue;
            throw new UnsupportedOperationException("Cannot assign user-specified hash to intermediate node in chain. This will be supported in future versions of Flink. As a work around start new chain at task " + node.getOperatorName() + ".");
        }
        Hasher hasher = hashFunction.newHasher();
        byte[] hash = this.generateUserSpecifiedHash(node, hasher);
        for (byte[] previousHash : hashes.values()) {
            if (!Arrays.equals(previousHash, hash)) continue;
            throw new IllegalArgumentException("Hash collision on user-specified ID. Most likely cause is a non-unique ID. Please check that all IDs specified via `uid(String)` are unique.");
        }
        if (hashes.put(node.getId(), hash) != null) {
            throw new IllegalStateException("Unexpected state. Tried to add node hash twice. This is probably a bug in the JobGraph generator.");
        }
        return true;
    }

    private byte[] generateUserSpecifiedHash(StreamNode node, Hasher hasher) {
        hasher.putString(node.getTransformationId(), Charset.forName("UTF-8"));
        return hasher.hash().asBytes();
    }

    /*
     * WARNING - void declaration
     */
    private byte[] generateDeterministicHash(StreamNode node, Hasher hasher, Map<Integer, byte[]> hashes) {
        this.generateNodeLocalHash(node, hasher, hashes.size());
        for (StreamEdge streamEdge : node.getOutEdges()) {
            if (!this.isChainable(streamEdge)) continue;
            StreamNode chainedNode = streamEdge.getTargetVertex();
            this.generateNodeLocalHash(chainedNode, hasher, hashes.size());
        }
        byte[] hash = hasher.hash().asBytes();
        for (StreamEdge inEdge : node.getInEdges()) {
            byte[] otherHash = hashes.get(inEdge.getSourceId());
            if (otherHash == null) {
                throw new IllegalStateException("Missing hash for input node " + inEdge.getSourceVertex() + ". Cannot generate hash for " + node + ".");
            }
            for (int j = 0; j < hash.length; ++j) {
                hash[j] = (byte)(hash[j] * 37 ^ otherHash[j]);
            }
        }
        if (LOG.isDebugEnabled()) {
            void var5_9;
            String string = "";
            if (node.getOperator() instanceof AbstractUdfStreamOperator) {
                String string2 = ((AbstractUdfStreamOperator)node.getOperator()).getUserFunction().getClass().getName();
            }
            LOG.debug("Generated hash '" + StringUtils.byteToHexString((byte[])hash) + "' for node " + "'" + node.toString() + "' {id: " + node.getId() + ", " + "parallelism: " + node.getParallelism() + ", " + "user function: " + (String)var5_9 + "}");
        }
        return hash;
    }

    private void generateNodeLocalHash(StreamNode node, Hasher hasher, int id) {
        hasher.putInt(id);
        hasher.putInt(node.getParallelism());
        if (node.getOperator() instanceof AbstractUdfStreamOperator) {
            String udfClassName = ((AbstractUdfStreamOperator)node.getOperator()).getUserFunction().getClass().getName();
            hasher.putString(udfClassName, Charset.forName("UTF-8"));
        }
    }
}

