package org.apache.storm.streams;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.storm.annotation.InterfaceStability;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.shade.com.google.common.collect.ArrayListMultimap;
import org.apache.storm.shade.com.google.common.collect.HashBasedTable;
import org.apache.storm.shade.com.google.common.collect.Multimap;
import org.apache.storm.shade.com.google.common.collect.Table;
import org.apache.storm.shade.org.jgrapht.DirectedGraph;
import org.apache.storm.shade.org.jgrapht.graph.DefaultDirectedGraph;
import org.apache.storm.shade.org.jgrapht.traverse.TopologicalOrderIterator;
import org.apache.storm.stats.ClientStatsUtil;
import org.apache.storm.streams.operations.IdentityFunction;
import org.apache.storm.streams.operations.mappers.PairValueMapper;
import org.apache.storm.streams.operations.mappers.TupleValueMapper;
import org.apache.storm.streams.processors.MapProcessor;
import org.apache.storm.streams.processors.StateQueryProcessor;
import org.apache.storm.streams.processors.StatefulProcessor;
import org.apache.storm.streams.processors.UpdateStateByKeyProcessor;
import org.apache.storm.streams.windowing.Window;
import org.apache.storm.topology.BoltDeclarer;
import org.apache.storm.topology.IBasicBolt;
import org.apache.storm.topology.IComponent;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Unstable
/* loaded from: input_file:org/apache/storm/streams/StreamBuilder.class */
public class StreamBuilder {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) StreamBuilder.class);
    private final Table<Node, String, GroupingInfo> nodeGroupingInfo = HashBasedTable.create();
    private final Map<Node, WindowNode> windowInfo = new HashMap();
    private final List<ProcessorNode> curGroup = new ArrayList();
    private final Map<StreamBolt, BoltDeclarer> streamBolts = new HashMap();
    private int statefulProcessorCount = 0;
    private String timestampFieldName = null;
    private final DefaultDirectedGraph<Node, Edge> graph = new DefaultDirectedGraph<>(new StreamsEdgeFactory());

    public Stream<Tuple> newStream(IRichSpout iRichSpout) {
        return newStream(iRichSpout, 1);
    }

    public Stream<Tuple> newStream(IRichSpout iRichSpout, int i) {
        SpoutNode spoutNode = new SpoutNode(iRichSpout);
        spoutNode.setComponentId(UniqueIdGen.getInstance().getUniqueSpoutId());
        spoutNode.setParallelism(i);
        this.graph.addVertex(spoutNode);
        return new Stream<>(this, spoutNode);
    }

    public <T> Stream<T> newStream(IRichSpout iRichSpout, TupleValueMapper<T> tupleValueMapper) {
        return (Stream<T>) newStream(iRichSpout).map(tupleValueMapper);
    }

    public <T> Stream<T> newStream(IRichSpout iRichSpout, TupleValueMapper<T> tupleValueMapper, int i) {
        return (Stream<T>) newStream(iRichSpout, i).map(tupleValueMapper);
    }

    public <K, V> PairStream<K, V> newStream(IRichSpout iRichSpout, PairValueMapper<K, V> pairValueMapper) {
        return newStream(iRichSpout).mapToPair(pairValueMapper);
    }

    public <K, V> PairStream<K, V> newStream(IRichSpout iRichSpout, PairValueMapper<K, V> pairValueMapper, int i) {
        return newStream(iRichSpout, i).mapToPair(pairValueMapper);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public StormTopology build() {
        this.nodeGroupingInfo.clear();
        this.windowInfo.clear();
        this.curGroup.clear();
        TopologicalOrderIterator topologicalOrderIterator = new TopologicalOrderIterator((DirectedGraph) this.graph, (Queue) queue());
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        while (topologicalOrderIterator.hasNext()) {
            Node node = (Node) topologicalOrderIterator.next();
            if (node instanceof SpoutNode) {
                addSpout(topologyBuilder, (SpoutNode) node);
            } else if (node instanceof ProcessorNode) {
                handleProcessorNode((ProcessorNode) node, topologyBuilder);
            } else if (node instanceof PartitionNode) {
                updateNodeGroupingInfo((PartitionNode) node);
                processCurGroup(topologyBuilder);
            } else if (node instanceof WindowNode) {
                updateWindowInfo((WindowNode) node);
                processCurGroup(topologyBuilder);
            } else if (node instanceof SinkNode) {
                processCurGroup(topologyBuilder);
                addSink(topologyBuilder, (SinkNode) node);
            }
        }
        processCurGroup(topologyBuilder);
        mayBeAddTsField();
        return topologyBuilder.createTopology();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Node addNode(Node node, Node node2) {
        return addNode(node, node2, node.getOutputStreams().iterator().next(), node.getParallelism());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Node addNode(Node node, Node node2, int i) {
        return addNode(node, node2, node.getOutputStreams().iterator().next(), i);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Node insert(Node node, Node node2) {
        Node addNode = addNode(node, node2);
        Iterator<Edge> it = this.graph.outgoingEdgesOf(node).iterator();
        while (it.hasNext()) {
            Node target = it.next().getTarget();
            this.graph.removeEdge(node, target);
            target.removeParentStreams(node);
            addNode(addNode, target);
        }
        return addNode;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Node addNode(Node node, Node node2, String str) {
        return addNode(node, node2, str, node.getParallelism());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Node addNode(Node node, Node node2, String str, int i) {
        this.graph.addVertex(node2);
        this.graph.addEdge(node, node2);
        node2.setParallelism(i);
        if ((node instanceof WindowNode) || (node instanceof PartitionNode)) {
            node2.addParentStream(parentNode(node), str);
        } else {
            node2.addParentStream(node, str);
        }
        if (!(node2 instanceof PartitionNode)) {
            if (node2.getGroupingInfo() == null) {
                node2.setGroupingInfo(node.getGroupingInfo());
            } else if (!node2.getGroupingInfo().equals(node.getGroupingInfo())) {
                throw new IllegalStateException("Trying to assign grouping info for node with current grouping info: " + node2.getGroupingInfo() + " to: " + node.getGroupingInfo() + " Node: " + node2);
            }
        }
        if (!(node2 instanceof WindowNode) && !node2.isWindowed()) {
            node2.setWindowed(node.isWindowed());
        }
        return node2;
    }

    private PriorityQueue<Node> queue() {
        return new PriorityQueue<>(new Comparator<Node>() { // from class: org.apache.storm.streams.StreamBuilder.1
            Map<Class<?>, Integer> p = new HashMap();

            {
                this.p.put(SpoutNode.class, 0);
                this.p.put(UpdateStateByKeyProcessor.class, 1);
                this.p.put(ProcessorNode.class, 2);
                this.p.put(PartitionNode.class, 3);
                this.p.put(WindowNode.class, 4);
                this.p.put(StateQueryProcessor.class, 5);
                this.p.put(SinkNode.class, 6);
            }

            @Override // java.util.Comparator
            public int compare(Node node, Node node2) {
                return getPriority(node) - getPriority(node2);
            }

            private int getPriority(Node node) {
                if (node instanceof ProcessorNode) {
                    Integer num = this.p.get(((ProcessorNode) node).getProcessor().getClass());
                    if (num != null) {
                        return num.intValue();
                    }
                }
                Integer num2 = this.p.get(node.getClass());
                if (num2 != null) {
                    return num2.intValue();
                }
                return Integer.MAX_VALUE;
            }
        });
    }

    private void handleProcessorNode(ProcessorNode processorNode, TopologyBuilder topologyBuilder) {
        if (processorNode.getProcessor() instanceof StatefulProcessor) {
            this.statefulProcessorCount++;
            Set<Window<?, ?>> windowParams = getWindowParams(initialProcessors(this.curGroup.isEmpty() ? Collections.singletonList(processorNode) : this.curGroup));
            if (this.statefulProcessorCount > 1 || !windowParams.isEmpty()) {
                if (!this.curGroup.isEmpty()) {
                    processCurGroup(topologyBuilder);
                } else if (!windowParams.isEmpty()) {
                    splitStatefulProcessor(processorNode, topologyBuilder);
                }
                this.statefulProcessorCount = 1;
            }
        }
        this.curGroup.add(processorNode);
    }

    private void splitStatefulProcessor(ProcessorNode processorNode, TopologyBuilder topologyBuilder) {
        for (Node node : StreamUtil.getParents(this.graph, processorNode)) {
            ProcessorNode processorNode2 = new ProcessorNode(new MapProcessor(new IdentityFunction()), UniqueIdGen.getInstance().getUniqueStreamId(), node.getOutputFields());
            addNode(node, processorNode2);
            this.graph.removeEdge(node, processorNode);
            processorNode.removeParentStreams(node);
            addNode(processorNode2, processorNode);
            this.curGroup.add(processorNode2);
        }
        processCurGroup(topologyBuilder);
    }

    private void mayBeAddTsField() {
        if (this.timestampFieldName != null) {
            Iterator<StreamBolt> it = this.streamBolts.keySet().iterator();
            while (it.hasNext()) {
                it.next().setTimestampField(this.timestampFieldName);
            }
        }
    }

    private void updateNodeGroupingInfo(PartitionNode partitionNode) {
        if (partitionNode.getGroupingInfo() != null) {
            for (Node node : parentNodes(partitionNode)) {
                Iterator<String> it = partitionNode.getParentStreams(node).iterator();
                while (it.hasNext()) {
                    this.nodeGroupingInfo.put(node, it.next(), partitionNode.getGroupingInfo());
                }
            }
        }
    }

    private void updateWindowInfo(WindowNode windowNode) {
        Iterator<Node> it = parentNodes(windowNode).iterator();
        while (it.hasNext()) {
            this.windowInfo.put(it.next(), windowNode);
        }
        String timestampField = windowNode.getWindowParams().getTimestampField();
        if (timestampField != null) {
            if (this.timestampFieldName != null && !timestampField.equals(this.timestampFieldName)) {
                throw new IllegalArgumentException("Cannot set different timestamp field names");
            }
            this.timestampFieldName = timestampField;
        }
    }

    Node parentNode(Node node) {
        Set<Node> parentNodes = parentNodes(node);
        if (parentNodes.size() > 1) {
            throw new IllegalArgumentException("Node " + node + " has more than one parent node.");
        }
        if (parentNodes.isEmpty()) {
            throw new IllegalArgumentException("Node " + node + " has no parent.");
        }
        return parentNodes.iterator().next();
    }

    private Set<Node> parentNodes(Node node) {
        HashSet hashSet = new HashSet();
        for (Node node2 : StreamUtil.getParents(this.graph, node)) {
            if ((node2 instanceof ProcessorNode) || (node2 instanceof SpoutNode)) {
                hashSet.add(node2);
            } else {
                hashSet.addAll(parentNodes(node2));
            }
        }
        return hashSet;
    }

    private Collection<List<ProcessorNode>> parallelismGroups(List<ProcessorNode> list) {
        return ((Map) list.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getParallelism();
        }))).values();
    }

    private void processCurGroup(TopologyBuilder topologyBuilder) {
        if (this.curGroup.isEmpty()) {
            return;
        }
        parallelismGroups(this.curGroup).forEach(list -> {
            doProcessCurGroup(topologyBuilder, list);
        });
        this.curGroup.clear();
    }

    private void doProcessCurGroup(TopologyBuilder topologyBuilder, List<ProcessorNode> list) {
        String uniqueBoltId = UniqueIdGen.getInstance().getUniqueBoltId();
        for (ProcessorNode processorNode : list) {
            processorNode.setComponentId(uniqueBoltId);
            processorNode.setWindowedParentStreams(getWindowedParentStreams(processorNode));
        }
        Set<ProcessorNode> initialProcessors = initialProcessors(list);
        Set<Window<?, ?>> windowParams = getWindowParams(initialProcessors);
        if (!windowParams.isEmpty()) {
            if (windowParams.size() != 1) {
                throw new IllegalStateException("More than one window config for current group " + list);
            }
            addWindowedBolt(topologyBuilder, uniqueBoltId, initialProcessors, windowParams.iterator().next(), list);
        } else if (hasStatefulProcessor(list)) {
            addStatefulBolt(topologyBuilder, uniqueBoltId, initialProcessors, list);
        } else {
            addBolt(topologyBuilder, uniqueBoltId, initialProcessors, list);
        }
    }

    private boolean hasStatefulProcessor(List<ProcessorNode> list) {
        Iterator<ProcessorNode> it = list.iterator();
        while (it.hasNext()) {
            if (it.next().getProcessor() instanceof StatefulProcessor) {
                return true;
            }
        }
        return false;
    }

    private int getParallelism(List<ProcessorNode> list) {
        Set set = (Set) list.stream().map((v0) -> {
            return v0.getParallelism();
        }).collect(Collectors.toSet());
        if (set.size() > 1) {
            throw new IllegalStateException("Current group does not have same parallelism " + list);
        }
        if (set.isEmpty()) {
            return 1;
        }
        return ((Integer) set.iterator().next()).intValue();
    }

    private Set<Window<?, ?>> getWindowParams(Set<ProcessorNode> set) {
        HashSet hashSet = new HashSet();
        Iterator<ProcessorNode> it = set.iterator();
        while (it.hasNext()) {
            for (Node node : parentNodes(it.next())) {
                if (this.windowInfo.containsKey(node)) {
                    hashSet.add(this.windowInfo.get(node));
                }
            }
        }
        return (Set) hashSet.stream().map((v0) -> {
            return v0.getWindowParams();
        }).collect(Collectors.toSet());
    }

    private void addSpout(TopologyBuilder topologyBuilder, SpoutNode spoutNode) {
        topologyBuilder.setSpout(spoutNode.getComponentId(), spoutNode.getSpout(), Integer.valueOf(spoutNode.getParallelism()));
    }

    private void addSink(TopologyBuilder topologyBuilder, SinkNode sinkNode) {
        BoltDeclarer bolt;
        IComponent bolt2 = sinkNode.getBolt();
        if (bolt2 instanceof IRichBolt) {
            bolt = topologyBuilder.setBolt(sinkNode.getComponentId(), (IRichBolt) bolt2, Integer.valueOf(sinkNode.getParallelism()));
        } else {
            if (!(bolt2 instanceof IBasicBolt)) {
                throw new IllegalArgumentException("Expect IRichBolt or IBasicBolt in addBolt");
            }
            bolt = topologyBuilder.setBolt(sinkNode.getComponentId(), (IBasicBolt) bolt2, Integer.valueOf(sinkNode.getParallelism()));
        }
        for (Node node : parentNodes(sinkNode)) {
            for (String str : sinkNode.getParentStreams(node)) {
                declareGrouping(bolt, node, str, this.nodeGroupingInfo.get(node, str));
            }
        }
    }

    private StreamBolt addBolt(TopologyBuilder topologyBuilder, String str, Set<ProcessorNode> set, List<ProcessorNode> list) {
        ProcessorBolt processorBolt = new ProcessorBolt(str, this.graph, list);
        BoltDeclarer bolt = topologyBuilder.setBolt(str, processorBolt, Integer.valueOf(getParallelism(list)));
        processorBolt.setStreamToInitialProcessors(wireBolt(list, bolt, set));
        this.streamBolts.put(processorBolt, bolt);
        return processorBolt;
    }

    private StreamBolt addStatefulBolt(TopologyBuilder topologyBuilder, String str, Set<ProcessorNode> set, List<ProcessorNode> list) {
        StatefulProcessorBolt<?, ?> findStatefulProcessorBolt;
        StateQueryProcessor<?, ?> stateQueryProcessor = getStateQueryProcessor(list);
        if (stateQueryProcessor == null) {
            findStatefulProcessorBolt = new StatefulProcessorBolt<>(str, this.graph, list);
            BoltDeclarer bolt = topologyBuilder.setBolt(str, findStatefulProcessorBolt, Integer.valueOf(getParallelism(list)));
            findStatefulProcessorBolt.setStreamToInitialProcessors(wireBolt(list, bolt, set));
            this.streamBolts.put(findStatefulProcessorBolt, bolt);
        } else {
            findStatefulProcessorBolt = findStatefulProcessorBolt(stateQueryProcessor.getStreamState().getUpdateStateNode());
            Iterator<ProcessorNode> it = list.iterator();
            while (it.hasNext()) {
                it.next().setComponentId(findStatefulProcessorBolt.getId());
            }
            findStatefulProcessorBolt.addNodes(list);
            findStatefulProcessorBolt.addStreamToInitialProcessors(wireBolt(findStatefulProcessorBolt.getNodes(), this.streamBolts.get(findStatefulProcessorBolt), set));
        }
        return findStatefulProcessorBolt;
    }

    private StateQueryProcessor<?, ?> getStateQueryProcessor(List<ProcessorNode> list) {
        for (ProcessorNode processorNode : list) {
            if (processorNode.getProcessor() instanceof StateQueryProcessor) {
                return (StateQueryProcessor) processorNode.getProcessor();
            }
        }
        return null;
    }

    private StreamBolt addWindowedBolt(TopologyBuilder topologyBuilder, String str, Set<ProcessorNode> set, Window<?, ?> window, List<ProcessorNode> list) {
        WindowedProcessorBolt windowedProcessorBolt = new WindowedProcessorBolt(str, this.graph, list, window);
        BoltDeclarer bolt = topologyBuilder.setBolt(str, windowedProcessorBolt, Integer.valueOf(getParallelism(list)));
        windowedProcessorBolt.setStreamToInitialProcessors(wireBolt(list, bolt, set));
        this.streamBolts.put(windowedProcessorBolt, bolt);
        return windowedProcessorBolt;
    }

    private StatefulProcessorBolt<?, ?> findStatefulProcessorBolt(ProcessorNode processorNode) {
        for (StreamBolt streamBolt : this.streamBolts.keySet()) {
            if (streamBolt instanceof StatefulProcessorBolt) {
                StatefulProcessorBolt<?, ?> statefulProcessorBolt = (StatefulProcessorBolt) streamBolt;
                if (statefulProcessorBolt.getNodes().contains(processorNode)) {
                    return statefulProcessorBolt;
                }
            }
        }
        throw new IllegalArgumentException("Could not find Stateful bolt for node " + processorNode);
    }

    private Set<String> getWindowedParentStreams(ProcessorNode processorNode) {
        HashSet hashSet = new HashSet();
        for (Node node : parentNodes(processorNode)) {
            if ((node instanceof ProcessorNode) && node.isWindowed()) {
                hashSet.addAll(node.getOutputStreams());
            }
        }
        return hashSet;
    }

    private Multimap<String, ProcessorNode> wireBolt(List<ProcessorNode> list, BoltDeclarer boltDeclarer, Set<ProcessorNode> set) {
        LOG.debug("Wiring bolt with boltDeclarer {}, group {}, initialProcessors {}, nodeGroupingInfo {}", boltDeclarer, list, set, this.nodeGroupingInfo);
        ArrayListMultimap create = ArrayListMultimap.create();
        HashSet hashSet = new HashSet(list);
        for (ProcessorNode processorNode : set) {
            for (Node node : parentNodes(processorNode)) {
                if (hashSet.contains(node)) {
                    LOG.debug("Parent {} of curNode {} is in group {}", node, processorNode, list);
                } else {
                    for (String str : processorNode.getParentStreams(node)) {
                        declareGrouping(boltDeclarer, node, str, this.nodeGroupingInfo.get(node, str));
                        if (node.getComponentId().startsWith(ClientStatsUtil.SPOUT)) {
                            str = node.getComponentId() + str;
                        } else {
                            declareGrouping(boltDeclarer, node, StreamUtil.getPunctuationStream(str), GroupingInfo.all());
                        }
                        create.put(str, processorNode);
                    }
                }
            }
        }
        return create;
    }

    private void declareGrouping(BoltDeclarer boltDeclarer, Node node, String str, GroupingInfo groupingInfo) {
        if (groupingInfo == null) {
            boltDeclarer.shuffleGrouping(node.getComponentId(), str);
        } else {
            groupingInfo.declareGrouping(boltDeclarer, node.getComponentId(), str, groupingInfo.getFields());
        }
    }

    private Set<ProcessorNode> initialProcessors(List<ProcessorNode> list) {
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet(list);
        for (ProcessorNode processorNode : list) {
            for (Node node : parentNodes(processorNode)) {
                if (!(node instanceof ProcessorNode) || !hashSet2.contains(node)) {
                    hashSet.add(processorNode);
                }
            }
        }
        return hashSet;
    }
}
