package org.apache.storm.trident.topology;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.storm.Config;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.generated.Grouping;
import org.apache.storm.generated.SharedMemory;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.grouping.CustomStreamGrouping;
import org.apache.storm.grouping.PartialKeyGrouping;
import org.apache.storm.shade.org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.storm.shade.org.apache.commons.lang.builder.ToStringStyle;
import org.apache.storm.topology.BaseConfigurationDeclarer;
import org.apache.storm.topology.BoltDeclarer;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.InputDeclarer;
import org.apache.storm.topology.SpoutDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.trident.spout.BatchSpoutExecutor;
import org.apache.storm.trident.spout.IBatchSpout;
import org.apache.storm.trident.spout.ICommitterTridentSpout;
import org.apache.storm.trident.spout.ITridentSpout;
import org.apache.storm.trident.spout.RichSpoutBatchTriggerer;
import org.apache.storm.trident.spout.TridentSpoutCoordinator;
import org.apache.storm.trident.spout.TridentSpoutExecutor;
import org.apache.storm.trident.topology.TridentBoltExecutor;
import org.apache.storm.tuple.Fields;

/* loaded from: input_file:org/apache/storm/trident/topology/TridentTopologyBuilder.class */
public class TridentTopologyBuilder {
    static final String SPOUT_COORD_PREFIX = "$spoutcoord-";
    Map<GlobalStreamId, String> batchIds = new HashMap();
    Map<String, TransactionalSpoutComponent> spouts = new HashMap();
    Map<String, SpoutComponent> batchPerTupleSpouts = new HashMap();
    Map<String, Component> bolts = new HashMap();

    /* loaded from: input_file:org/apache/storm/trident/topology/TridentTopologyBuilder$BoltDeclarerImpl.class */
    private static class BoltDeclarerImpl extends BaseConfigurationDeclarer<BoltDeclarer> implements BoltDeclarer {
        Component component;

        BoltDeclarerImpl(Component component) {
            this.component = component;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.storm.topology.InputDeclarer
        public BoltDeclarer fieldsGrouping(final String str, final Fields fields) {
            addDeclaration(new InputDeclaration() { // from class: org.apache.storm.trident.topology.TridentTopologyBuilder.BoltDeclarerImpl.1
                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public void declare(InputDeclarer inputDeclarer) {
                    inputDeclarer.fieldsGrouping(str, fields);
                }

                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public String getComponent() {
                    return str;
                }

                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public String getStream() {
                    return null;
                }
            });
            return this;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.storm.topology.InputDeclarer
        public BoltDeclarer fieldsGrouping(final String str, final String str2, final Fields fields) {
            addDeclaration(new InputDeclaration() { // from class: org.apache.storm.trident.topology.TridentTopologyBuilder.BoltDeclarerImpl.2
                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public void declare(InputDeclarer inputDeclarer) {
                    inputDeclarer.fieldsGrouping(str, str2, fields);
                }

                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public String getComponent() {
                    return str;
                }

                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public String getStream() {
                    return str2;
                }
            });
            return this;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.storm.topology.InputDeclarer
        public BoltDeclarer globalGrouping(final String str) {
            addDeclaration(new InputDeclaration() { // from class: org.apache.storm.trident.topology.TridentTopologyBuilder.BoltDeclarerImpl.3
                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public void declare(InputDeclarer inputDeclarer) {
                    inputDeclarer.globalGrouping(str);
                }

                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public String getComponent() {
                    return str;
                }

                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public String getStream() {
                    return null;
                }
            });
            return this;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.storm.topology.InputDeclarer
        public BoltDeclarer globalGrouping(final String str, final String str2) {
            addDeclaration(new InputDeclaration() { // from class: org.apache.storm.trident.topology.TridentTopologyBuilder.BoltDeclarerImpl.4
                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public void declare(InputDeclarer inputDeclarer) {
                    inputDeclarer.globalGrouping(str, str2);
                }

                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public String getComponent() {
                    return str;
                }

                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public String getStream() {
                    return str2;
                }
            });
            return this;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.storm.topology.InputDeclarer
        public BoltDeclarer shuffleGrouping(final String str) {
            addDeclaration(new InputDeclaration() { // from class: org.apache.storm.trident.topology.TridentTopologyBuilder.BoltDeclarerImpl.5
                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public void declare(InputDeclarer inputDeclarer) {
                    inputDeclarer.shuffleGrouping(str);
                }

                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public String getComponent() {
                    return str;
                }

                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public String getStream() {
                    return null;
                }
            });
            return this;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.storm.topology.InputDeclarer
        public BoltDeclarer shuffleGrouping(final String str, final String str2) {
            addDeclaration(new InputDeclaration() { // from class: org.apache.storm.trident.topology.TridentTopologyBuilder.BoltDeclarerImpl.6
                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public void declare(InputDeclarer inputDeclarer) {
                    inputDeclarer.shuffleGrouping(str, str2);
                }

                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public String getComponent() {
                    return str;
                }

                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public String getStream() {
                    return str2;
                }
            });
            return this;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.storm.topology.InputDeclarer
        public BoltDeclarer localOrShuffleGrouping(final String str) {
            addDeclaration(new InputDeclaration() { // from class: org.apache.storm.trident.topology.TridentTopologyBuilder.BoltDeclarerImpl.7
                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public void declare(InputDeclarer inputDeclarer) {
                    inputDeclarer.localOrShuffleGrouping(str);
                }

                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public String getComponent() {
                    return str;
                }

                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public String getStream() {
                    return null;
                }
            });
            return this;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.storm.topology.InputDeclarer
        public BoltDeclarer localOrShuffleGrouping(final String str, final String str2) {
            addDeclaration(new InputDeclaration() { // from class: org.apache.storm.trident.topology.TridentTopologyBuilder.BoltDeclarerImpl.8
                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public void declare(InputDeclarer inputDeclarer) {
                    inputDeclarer.localOrShuffleGrouping(str, str2);
                }

                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public String getComponent() {
                    return str;
                }

                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public String getStream() {
                    return str2;
                }
            });
            return this;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.storm.topology.InputDeclarer
        public BoltDeclarer noneGrouping(final String str) {
            addDeclaration(new InputDeclaration() { // from class: org.apache.storm.trident.topology.TridentTopologyBuilder.BoltDeclarerImpl.9
                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public void declare(InputDeclarer inputDeclarer) {
                    inputDeclarer.noneGrouping(str);
                }

                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public String getComponent() {
                    return str;
                }

                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public String getStream() {
                    return null;
                }
            });
            return this;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.storm.topology.InputDeclarer
        public BoltDeclarer noneGrouping(final String str, final String str2) {
            addDeclaration(new InputDeclaration() { // from class: org.apache.storm.trident.topology.TridentTopologyBuilder.BoltDeclarerImpl.10
                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public void declare(InputDeclarer inputDeclarer) {
                    inputDeclarer.noneGrouping(str, str2);
                }

                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public String getComponent() {
                    return str;
                }

                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public String getStream() {
                    return str2;
                }
            });
            return this;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.storm.topology.InputDeclarer
        public BoltDeclarer allGrouping(final String str) {
            addDeclaration(new InputDeclaration() { // from class: org.apache.storm.trident.topology.TridentTopologyBuilder.BoltDeclarerImpl.11
                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public void declare(InputDeclarer inputDeclarer) {
                    inputDeclarer.allGrouping(str);
                }

                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public String getComponent() {
                    return str;
                }

                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public String getStream() {
                    return null;
                }
            });
            return this;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.storm.topology.InputDeclarer
        public BoltDeclarer allGrouping(final String str, final String str2) {
            addDeclaration(new InputDeclaration() { // from class: org.apache.storm.trident.topology.TridentTopologyBuilder.BoltDeclarerImpl.12
                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public void declare(InputDeclarer inputDeclarer) {
                    inputDeclarer.allGrouping(str, str2);
                }

                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public String getComponent() {
                    return str;
                }

                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public String getStream() {
                    return str2;
                }
            });
            return this;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.storm.topology.InputDeclarer
        public BoltDeclarer directGrouping(final String str) {
            addDeclaration(new InputDeclaration() { // from class: org.apache.storm.trident.topology.TridentTopologyBuilder.BoltDeclarerImpl.13
                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public void declare(InputDeclarer inputDeclarer) {
                    inputDeclarer.directGrouping(str);
                }

                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public String getComponent() {
                    return str;
                }

                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public String getStream() {
                    return null;
                }
            });
            return this;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.storm.topology.InputDeclarer
        public BoltDeclarer directGrouping(final String str, final String str2) {
            addDeclaration(new InputDeclaration() { // from class: org.apache.storm.trident.topology.TridentTopologyBuilder.BoltDeclarerImpl.14
                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public void declare(InputDeclarer inputDeclarer) {
                    inputDeclarer.directGrouping(str, str2);
                }

                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public String getComponent() {
                    return str;
                }

                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public String getStream() {
                    return str2;
                }
            });
            return this;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.storm.topology.InputDeclarer
        public BoltDeclarer partialKeyGrouping(String str, Fields fields) {
            return customGrouping(str, (CustomStreamGrouping) new PartialKeyGrouping(fields));
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.storm.topology.InputDeclarer
        public BoltDeclarer partialKeyGrouping(String str, String str2, Fields fields) {
            return customGrouping(str, str2, (CustomStreamGrouping) new PartialKeyGrouping(fields));
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.storm.topology.InputDeclarer
        public BoltDeclarer customGrouping(final String str, final CustomStreamGrouping customStreamGrouping) {
            addDeclaration(new InputDeclaration() { // from class: org.apache.storm.trident.topology.TridentTopologyBuilder.BoltDeclarerImpl.15
                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public void declare(InputDeclarer inputDeclarer) {
                    inputDeclarer.customGrouping(str, customStreamGrouping);
                }

                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public String getComponent() {
                    return str;
                }

                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public String getStream() {
                    return null;
                }
            });
            return this;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.storm.topology.InputDeclarer
        public BoltDeclarer customGrouping(final String str, final String str2, final CustomStreamGrouping customStreamGrouping) {
            addDeclaration(new InputDeclaration() { // from class: org.apache.storm.trident.topology.TridentTopologyBuilder.BoltDeclarerImpl.16
                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public void declare(InputDeclarer inputDeclarer) {
                    inputDeclarer.customGrouping(str, str2, customStreamGrouping);
                }

                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public String getComponent() {
                    return str;
                }

                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public String getStream() {
                    return str2;
                }
            });
            return this;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.storm.topology.InputDeclarer
        public BoltDeclarer grouping(final GlobalStreamId globalStreamId, final Grouping grouping) {
            addDeclaration(new InputDeclaration() { // from class: org.apache.storm.trident.topology.TridentTopologyBuilder.BoltDeclarerImpl.17
                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public void declare(InputDeclarer inputDeclarer) {
                    inputDeclarer.grouping(globalStreamId, grouping);
                }

                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public String getComponent() {
                    return globalStreamId.get_componentId();
                }

                @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.InputDeclaration
                public String getStream() {
                    return globalStreamId.get_streamId();
                }
            });
            return this;
        }

        private void addDeclaration(InputDeclaration inputDeclaration) {
            this.component.declarations.add(inputDeclaration);
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.storm.topology.ComponentConfigurationDeclarer
        public BoltDeclarer addConfigurations(Map<String, Object> map) {
            if (map != null) {
                this.component.componentConf.putAll(map);
            }
            return this;
        }

        @Override // org.apache.storm.topology.ComponentConfigurationDeclarer
        public Map<String, Object> getComponentConfiguration() {
            return this.component.componentConf;
        }

        @Override // org.apache.storm.topology.ResourceDeclarer
        public BoltDeclarer addSharedMemory(SharedMemory sharedMemory) {
            this.component.sharedMemory.add(sharedMemory);
            return this;
        }

        @Override // org.apache.storm.topology.ComponentConfigurationDeclarer
        public /* bridge */ /* synthetic */ BoltDeclarer addConfigurations(Map map) {
            return addConfigurations((Map<String, Object>) map);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/storm/trident/topology/TridentTopologyBuilder$Component.class */
    public static class Component {
        public final ITridentBatchBolt bolt;
        public final Integer parallelism;
        public final Set<String> committerBatches;
        public final List<InputDeclaration> declarations = new ArrayList();
        public final Map<String, Object> componentConf = new HashMap();
        public final Set<SharedMemory> sharedMemory = new HashSet();

        Component(ITridentBatchBolt iTridentBatchBolt, Integer num, Set<String> set) {
            this.bolt = iTridentBatchBolt;
            this.parallelism = num;
            this.committerBatches = set;
        }

        public String toString() {
            return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/storm/trident/topology/TridentTopologyBuilder$InputDeclaration.class */
    public interface InputDeclaration {
        void declare(InputDeclarer inputDeclarer);

        String getComponent();

        String getStream();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/storm/trident/topology/TridentTopologyBuilder$SpoutComponent.class */
    public static class SpoutComponent {
        final Object spout;
        final Integer parallelism;
        final String batchGroupId;
        final String streamName;
        final Map<String, Object> componentConf = new HashMap();
        final Set<SharedMemory> sharedMemory = new HashSet();

        SpoutComponent(Object obj, String str, Integer num, String str2) {
            this.spout = obj;
            this.streamName = str;
            this.parallelism = num;
            this.batchGroupId = str2;
        }

        public String toString() {
            return ToStringBuilder.reflectionToString(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/storm/trident/topology/TridentTopologyBuilder$SpoutDeclarerImpl.class */
    public static class SpoutDeclarerImpl extends BaseConfigurationDeclarer<SpoutDeclarer> implements SpoutDeclarer {
        SpoutComponent component;

        SpoutDeclarerImpl(SpoutComponent spoutComponent) {
            this.component = spoutComponent;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.storm.topology.ComponentConfigurationDeclarer
        public SpoutDeclarer addConfigurations(Map<String, Object> map) {
            if (map != null) {
                this.component.componentConf.putAll(map);
            }
            return this;
        }

        @Override // org.apache.storm.topology.ComponentConfigurationDeclarer
        public Map<String, Object> getComponentConfiguration() {
            return this.component.componentConf;
        }

        @Override // org.apache.storm.topology.ResourceDeclarer
        public SpoutDeclarer addSharedMemory(SharedMemory sharedMemory) {
            this.component.sharedMemory.add(sharedMemory);
            return this;
        }

        @Override // org.apache.storm.topology.ComponentConfigurationDeclarer
        public /* bridge */ /* synthetic */ SpoutDeclarer addConfigurations(Map map) {
            return addConfigurations((Map<String, Object>) map);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/storm/trident/topology/TridentTopologyBuilder$TransactionalSpoutComponent.class */
    public static class TransactionalSpoutComponent extends SpoutComponent {
        public String commitStateId;

        TransactionalSpoutComponent(Object obj, String str, Integer num, String str2, String str3) {
            super(obj, str, num, str3);
            this.commitStateId = str2;
        }

        @Override // org.apache.storm.trident.topology.TridentTopologyBuilder.SpoutComponent
        public String toString() {
            return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
        }
    }

    public static String spoutCoordinator(String str) {
        return SPOUT_COORD_PREFIX + str;
    }

    public static String spoutIdFromCoordinatorId(String str) {
        return str.substring(SPOUT_COORD_PREFIX.length());
    }

    public SpoutDeclarer setBatchPerTupleSpout(String str, String str2, IRichSpout iRichSpout, Integer num, String str3) {
        HashMap hashMap = new HashMap();
        hashMap.put(str2, str3);
        markBatchGroups(str, hashMap);
        SpoutComponent spoutComponent = new SpoutComponent(iRichSpout, str2, num, str3);
        this.batchPerTupleSpouts.put(str, spoutComponent);
        return new SpoutDeclarerImpl(spoutComponent);
    }

    public SpoutDeclarer setSpout(String str, String str2, String str3, IBatchSpout iBatchSpout, Integer num, String str4) {
        return setSpout(str, str2, str3, new BatchSpoutExecutor(iBatchSpout), num, str4);
    }

    public SpoutDeclarer setSpout(String str, String str2, String str3, ITridentSpout iTridentSpout, Integer num, String str4) {
        HashMap hashMap = new HashMap();
        hashMap.put(str2, str4);
        markBatchGroups(str, hashMap);
        TransactionalSpoutComponent transactionalSpoutComponent = new TransactionalSpoutComponent(iTridentSpout, str2, num, str3, str4);
        this.spouts.put(str, transactionalSpoutComponent);
        return new SpoutDeclarerImpl(transactionalSpoutComponent);
    }

    public BoltDeclarer setBolt(String str, ITridentBatchBolt iTridentBatchBolt, Integer num, Set<String> set, Map<String, String> map) {
        markBatchGroups(str, map);
        Component component = new Component(iTridentBatchBolt, num, set);
        this.bolts.put(str, component);
        return new BoltDeclarerImpl(component);
    }

    String masterCoordinator(String str) {
        return "$mastercoord-" + str;
    }

    Map<GlobalStreamId, String> fleshOutStreamBatchIds(boolean z) {
        HashMap hashMap = new HashMap(this.batchIds);
        for (String str : new HashSet(this.batchIds.values())) {
            hashMap.put(new GlobalStreamId(masterCoordinator(str), MasterBatchCoordinator.BATCH_STREAM_ID), str);
            if (z) {
                hashMap.put(new GlobalStreamId(masterCoordinator(str), MasterBatchCoordinator.COMMIT_STREAM_ID), str);
            }
        }
        for (String str2 : this.spouts.keySet()) {
            TransactionalSpoutComponent transactionalSpoutComponent = this.spouts.get(str2);
            if (transactionalSpoutComponent.batchGroupId != null) {
                hashMap.put(new GlobalStreamId(spoutCoordinator(str2), MasterBatchCoordinator.BATCH_STREAM_ID), transactionalSpoutComponent.batchGroupId);
            }
        }
        for (GlobalStreamId globalStreamId : this.batchIds.keySet()) {
            String str3 = this.batchIds.get(globalStreamId);
            hashMap.put(new GlobalStreamId(globalStreamId.get_componentId(), TridentBoltExecutor.coordStream(str3)), str3);
        }
        return hashMap;
    }

    public StormTopology buildTopology(Map<String, Number> map) {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        Map<GlobalStreamId, String> fleshOutStreamBatchIds = fleshOutStreamBatchIds(false);
        Map<GlobalStreamId, String> fleshOutStreamBatchIds2 = fleshOutStreamBatchIds(true);
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (String str : this.spouts.keySet()) {
            TransactionalSpoutComponent transactionalSpoutComponent = this.spouts.get(str);
            if (transactionalSpoutComponent.spout instanceof IRichSpout) {
                topologyBuilder.setSpout(str, (IRichSpout) transactionalSpoutComponent.spout, transactionalSpoutComponent.parallelism);
            } else {
                String str2 = transactionalSpoutComponent.batchGroupId;
                if (!hashMap.containsKey(str2)) {
                    hashMap.put(str2, new ArrayList());
                }
                ((List) hashMap.get(str2)).add(transactionalSpoutComponent.commitStateId);
                if (!hashMap2.containsKey(str2)) {
                    hashMap2.put(str2, new ArrayList());
                }
                ((List) hashMap2.get(str2)).add((ITridentSpout) transactionalSpoutComponent.spout);
                BoltDeclarer globalGrouping = topologyBuilder.setBolt(spoutCoordinator(str), new TridentSpoutCoordinator(transactionalSpoutComponent.commitStateId, (ITridentSpout) transactionalSpoutComponent.spout)).globalGrouping(masterCoordinator(transactionalSpoutComponent.batchGroupId), MasterBatchCoordinator.BATCH_STREAM_ID).globalGrouping(masterCoordinator(transactionalSpoutComponent.batchGroupId), MasterBatchCoordinator.SUCCESS_STREAM_ID);
                Iterator<SharedMemory> it = transactionalSpoutComponent.sharedMemory.iterator();
                while (it.hasNext()) {
                    globalGrouping.addSharedMemory(it.next());
                }
                globalGrouping.addConfigurations(transactionalSpoutComponent.componentConf);
                HashMap hashMap3 = new HashMap();
                hashMap3.put(transactionalSpoutComponent.batchGroupId, new TridentBoltExecutor.CoordSpec());
                BoltDeclarer bolt = topologyBuilder.setBolt(str, new TridentBoltExecutor(new TridentSpoutExecutor(transactionalSpoutComponent.commitStateId, transactionalSpoutComponent.streamName, (ITridentSpout) transactionalSpoutComponent.spout), fleshOutStreamBatchIds, hashMap3), transactionalSpoutComponent.parallelism);
                bolt.allGrouping(spoutCoordinator(str), MasterBatchCoordinator.BATCH_STREAM_ID);
                bolt.allGrouping(masterCoordinator(str2), MasterBatchCoordinator.SUCCESS_STREAM_ID);
                if (transactionalSpoutComponent.spout instanceof ICommitterTridentSpout) {
                    bolt.allGrouping(masterCoordinator(str2), MasterBatchCoordinator.COMMIT_STREAM_ID);
                }
                bolt.addConfigurations(transactionalSpoutComponent.componentConf);
            }
        }
        for (String str3 : this.batchPerTupleSpouts.keySet()) {
            SpoutComponent spoutComponent = this.batchPerTupleSpouts.get(str3);
            topologyBuilder.setSpout(str3, new RichSpoutBatchTriggerer((IRichSpout) spoutComponent.spout, spoutComponent.streamName, spoutComponent.batchGroupId), spoutComponent.parallelism).addConfigurations(spoutComponent.componentConf);
        }
        Number number = map.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
        Number number2 = map.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
        Number number3 = map.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
        for (String str4 : hashMap.keySet()) {
            SpoutDeclarer spout = topologyBuilder.setSpout(masterCoordinator(str4), new MasterBatchCoordinator((List) hashMap.get(str4), (List) hashMap2.get(str4)));
            if (number != null) {
                if (number2 != null) {
                    spout.setMemoryLoad(number, number2);
                } else {
                    spout.setMemoryLoad(number);
                }
            }
            if (number3 != null) {
                spout.setCPULoad(number3);
            }
        }
        for (String str5 : this.bolts.keySet()) {
            Component component = this.bolts.get(str5);
            HashMap hashMap4 = new HashMap();
            for (GlobalStreamId globalStreamId : getBoltSubscriptionStreams(str5)) {
                String str6 = fleshOutStreamBatchIds2.get(globalStreamId);
                if (!hashMap4.containsKey(str6)) {
                    hashMap4.put(str6, new TridentBoltExecutor.CoordSpec());
                }
                ((TridentBoltExecutor.CoordSpec) hashMap4.get(str6)).coords.put(globalStreamId.get_componentId(), this.batchPerTupleSpouts.containsKey(globalStreamId.get_componentId()) ? TridentBoltExecutor.CoordType.single() : TridentBoltExecutor.CoordType.all());
            }
            for (String str7 : component.committerBatches) {
                ((TridentBoltExecutor.CoordSpec) hashMap4.get(str7)).commitStream = new GlobalStreamId(masterCoordinator(str7), MasterBatchCoordinator.COMMIT_STREAM_ID);
            }
            BoltDeclarer bolt2 = topologyBuilder.setBolt(str5, new TridentBoltExecutor(component.bolt, fleshOutStreamBatchIds2, hashMap4), component.parallelism);
            Iterator<SharedMemory> it2 = component.sharedMemory.iterator();
            while (it2.hasNext()) {
                bolt2.addSharedMemory(it2.next());
            }
            bolt2.addConfigurations(component.componentConf);
            Iterator<InputDeclaration> it3 = component.declarations.iterator();
            while (it3.hasNext()) {
                it3.next().declare(bolt2);
            }
            for (Map.Entry<String, Set<String>> entry : getBoltBatchToComponentSubscriptions(str5).entrySet()) {
                Iterator<String> it4 = entry.getValue().iterator();
                while (it4.hasNext()) {
                    bolt2.directGrouping(it4.next(), TridentBoltExecutor.coordStream(entry.getKey()));
                }
            }
            Iterator<String> it5 = component.committerBatches.iterator();
            while (it5.hasNext()) {
                bolt2.allGrouping(masterCoordinator(it5.next()), MasterBatchCoordinator.COMMIT_STREAM_ID);
            }
        }
        return topologyBuilder.createTopology();
    }

    private void markBatchGroups(String str, Map<String, String> map) {
        for (Map.Entry<String, String> entry : map.entrySet()) {
            this.batchIds.put(new GlobalStreamId(str, entry.getKey()), entry.getValue());
        }
    }

    Map<String, Set<String>> getBoltBatchToComponentSubscriptions(String str) {
        HashMap hashMap = new HashMap();
        for (GlobalStreamId globalStreamId : getBoltSubscriptionStreams(str)) {
            String str2 = this.batchIds.get(globalStreamId);
            if (!hashMap.containsKey(str2)) {
                hashMap.put(str2, new HashSet());
            }
            ((Set) hashMap.get(str2)).add(globalStreamId.get_componentId());
        }
        return hashMap;
    }

    List<GlobalStreamId> getBoltSubscriptionStreams(String str) {
        ArrayList arrayList = new ArrayList();
        for (InputDeclaration inputDeclaration : this.bolts.get(str).declarations) {
            arrayList.add(new GlobalStreamId(inputDeclaration.getComponent(), inputDeclaration.getStream()));
        }
        return arrayList;
    }
}
