package org.apache.flink.storm.api;

import backtype.storm.generated.ComponentCommon;
import backtype.storm.generated.GlobalStreamId;
import backtype.storm.generated.Grouping;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.BasicBoltExecutor;
import backtype.storm.topology.BoltDeclarer;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.SpoutDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.storm.util.SplitStreamType;
import org.apache.flink.storm.util.SplitStreamTypeKeySelector;
import org.apache.flink.storm.util.StormStreamSelector;
import org.apache.flink.storm.wrappers.BoltWrapper;
import org.apache.flink.storm.wrappers.SpoutWrapper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitStream;

/* loaded from: input_file:org/apache/flink/storm/api/FlinkTopologyBuilder.class */
public class FlinkTopologyBuilder {
    private final TopologyBuilder stormBuilder = new TopologyBuilder();
    private final HashMap<String, IRichSpout> spouts = new HashMap<>();
    private final HashMap<String, IRichBolt> bolts = new HashMap<>();
    private final HashMap<String, HashMap<String, Fields>> outputStreams = new HashMap<>();
    private final HashMap<String, FlinkOutputFieldsDeclarer> declarers = new HashMap<>();
    private StormTopology stormTopology;

    public FlinkTopology createTopology() {
        BoltWrapper boltWrapper;
        SingleOutputStreamOperator transform;
        DataStreamSource addSource;
        this.stormTopology = this.stormBuilder.createTopology();
        FlinkTopology flinkTopology = new FlinkTopology();
        flinkTopology.setParallelism(1);
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, IRichSpout> entry : this.spouts.entrySet()) {
            String key = entry.getKey();
            IRichSpout value = entry.getValue();
            FlinkOutputFieldsDeclarer flinkOutputFieldsDeclarer = new FlinkOutputFieldsDeclarer();
            value.declareOutputFields(flinkOutputFieldsDeclarer);
            HashMap<String, Fields> hashMap2 = flinkOutputFieldsDeclarer.outputStreams;
            this.outputStreams.put(key, hashMap2);
            this.declarers.put(key, flinkOutputFieldsDeclarer);
            SpoutWrapper spoutWrapper = new SpoutWrapper(value);
            spoutWrapper.setStormTopology(this.stormTopology);
            HashMap hashMap3 = new HashMap();
            if (hashMap2.size() == 1) {
                String str = (String) hashMap2.keySet().toArray()[0];
                addSource = flinkTopology.addSource(spoutWrapper, key, flinkOutputFieldsDeclarer.getOutputType(str));
                hashMap3.put(str, addSource);
            } else {
                addSource = flinkTopology.addSource(spoutWrapper, key, TypeExtractor.getForClass(SplitStreamType.class));
                SplitStream split = addSource.split(new StormStreamSelector());
                for (String str2 : hashMap2.keySet()) {
                    hashMap3.put(str2, split.select(new String[]{str2}));
                }
            }
            hashMap.put(key, hashMap3);
            int i = 1;
            ComponentCommon componentCommon = this.stormTopology.get_spouts().get(key).get_common();
            if (componentCommon.is_set_parallelism_hint()) {
                i = componentCommon.get_parallelism_hint();
                addSource.setParallelism(i);
            } else {
                componentCommon.set_parallelism_hint(1);
            }
            flinkTopology.increaseNumberOfTasks(i);
        }
        HashMap hashMap4 = new HashMap();
        hashMap4.putAll(this.bolts);
        HashMap hashMap5 = new HashMap();
        boolean z = true;
        while (hashMap4.size() > 0) {
            if (!z) {
                throw new RuntimeException("Unable to build Topology. Could not connect the following bolts: " + hashMap4.keySet());
            }
            z = false;
            Iterator it = hashMap4.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry entry2 = (Map.Entry) it.next();
                String str3 = (String) entry2.getKey();
                IRichBolt iRichBolt = (IRichBolt) entry2.getValue();
                ComponentCommon componentCommon2 = this.stormTopology.get_bolts().get(str3).get_common();
                Set set = (Set) hashMap5.get(str3);
                if (set == null) {
                    set = new HashSet();
                    set.addAll(componentCommon2.get_inputs().entrySet());
                    hashMap5.put(str3, set);
                }
                Iterator it2 = set.iterator();
                while (it2.hasNext()) {
                    Map.Entry entry3 = (Map.Entry) it2.next();
                    String str4 = ((GlobalStreamId) entry3.getKey()).get_componentId();
                    String str5 = ((GlobalStreamId) entry3.getKey()).get_streamId();
                    HashMap hashMap6 = (HashMap) hashMap.get(str4);
                    if (hashMap6 != null) {
                        z = true;
                        DataStream dataStream = (DataStream) hashMap6.get(str5);
                        if (dataStream == null) {
                            throw new RuntimeException("Cannot connect '" + str3 + "' to '" + str4 + "'. Stream '" + str5 + "' not found.");
                        }
                        FlinkOutputFieldsDeclarer flinkOutputFieldsDeclarer2 = new FlinkOutputFieldsDeclarer();
                        iRichBolt.declareOutputFields(flinkOutputFieldsDeclarer2);
                        HashMap<String, Fields> hashMap7 = flinkOutputFieldsDeclarer2.outputStreams;
                        this.outputStreams.put(str3, hashMap7);
                        this.declarers.put(str3, flinkOutputFieldsDeclarer2);
                        Grouping grouping = (Grouping) entry3.getValue();
                        if (grouping.is_set_shuffle()) {
                            dataStream = dataStream.rebalance();
                        } else if (grouping.is_set_fields()) {
                            if (grouping.get_fields().size() > 0) {
                                FlinkOutputFieldsDeclarer flinkOutputFieldsDeclarer3 = this.declarers.get(str4);
                                dataStream = hashMap6.size() == 1 ? dataStream.keyBy(flinkOutputFieldsDeclarer3.getGroupingFieldIndexes(str5, grouping.get_fields())) : dataStream.keyBy(new SplitStreamTypeKeySelector(dataStream.getType(), flinkOutputFieldsDeclarer3.getGroupingFieldIndexes(str5, grouping.get_fields())));
                            } else {
                                dataStream = dataStream.global();
                            }
                        } else if (grouping.is_set_all()) {
                            dataStream = dataStream.broadcast();
                        } else if (!grouping.is_set_local_or_shuffle()) {
                            throw new UnsupportedOperationException("Flink only supports (local-or-)shuffle, fields, all, and global grouping");
                        }
                        if (hashMap7.size() < 2) {
                            String str6 = hashMap7.size() == 1 ? (String) hashMap7.keySet().toArray()[0] : null;
                            TypeInformation<?> outputType = flinkOutputFieldsDeclarer2.getOutputType(str6);
                            boltWrapper = new BoltWrapper(iRichBolt, this.outputStreams.get(str4).get(str5));
                            transform = dataStream.transform(str3, outputType, boltWrapper);
                            if (outputType != null) {
                                HashMap hashMap8 = new HashMap();
                                hashMap8.put(str6, transform);
                                hashMap.put(str3, hashMap8);
                            }
                        } else {
                            TypeInformation forClass = TypeExtractor.getForClass(SplitStreamType.class);
                            boltWrapper = new BoltWrapper(iRichBolt, this.outputStreams.get(str4).get(str5));
                            transform = dataStream.transform(str3, forClass, boltWrapper);
                            SplitStream split2 = transform.split(new StormStreamSelector());
                            HashMap hashMap9 = new HashMap();
                            for (String str7 : hashMap7.keySet()) {
                                hashMap9.put(str7, split2.select(new String[]{str7}));
                            }
                            hashMap.put(str3, hashMap9);
                        }
                        boltWrapper.setStormTopology(this.stormTopology);
                        int i2 = 1;
                        if (componentCommon2.is_set_parallelism_hint()) {
                            i2 = componentCommon2.get_parallelism_hint();
                            transform.setParallelism(i2);
                        } else {
                            componentCommon2.set_parallelism_hint(1);
                        }
                        flinkTopology.increaseNumberOfTasks(i2);
                        it2.remove();
                    }
                }
                if (set.size() == 0) {
                    it.remove();
                }
            }
        }
        return flinkTopology;
    }

    public BoltDeclarer setBolt(String str, IRichBolt iRichBolt) {
        return setBolt(str, iRichBolt, (Number) null);
    }

    public BoltDeclarer setBolt(String str, IRichBolt iRichBolt, Number number) {
        BoltDeclarer bolt = this.stormBuilder.setBolt(str, iRichBolt, number);
        this.bolts.put(str, iRichBolt);
        return bolt;
    }

    public BoltDeclarer setBolt(String str, IBasicBolt iBasicBolt) {
        return setBolt(str, iBasicBolt, (Number) null);
    }

    public BoltDeclarer setBolt(String str, IBasicBolt iBasicBolt, Number number) {
        return setBolt(str, new BasicBoltExecutor(iBasicBolt), number);
    }

    public SpoutDeclarer setSpout(String str, IRichSpout iRichSpout) {
        return setSpout(str, iRichSpout, null);
    }

    public SpoutDeclarer setSpout(String str, IRichSpout iRichSpout, Number number) {
        SpoutDeclarer spout = this.stormBuilder.setSpout(str, iRichSpout, number);
        this.spouts.put(str, iRichSpout);
        return spout;
    }

    StormTopology getStormTopology() {
        return this.stormTopology;
    }
}
