package org.apache.flink.storm.api;

import backtype.storm.generated.ComponentCommon;
import backtype.storm.generated.GlobalStreamId;
import backtype.storm.generated.Grouping;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.storm.util.SplitStreamMapper;
import org.apache.flink.storm.util.SplitStreamType;
import org.apache.flink.storm.util.StormStreamSelector;
import org.apache.flink.storm.wrappers.BoltWrapper;
import org.apache.flink.storm.wrappers.MergedInputsBoltWrapper;
import org.apache.flink.storm.wrappers.SpoutWrapper;
import org.apache.flink.storm.wrappers.StormTuple;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.InstantiationUtil;

/* loaded from: input_file:org/apache/flink/storm/api/FlinkTopology.class */
public class FlinkTopology {
    private final TopologyBuilder builder;
    private final StormTopology stormTopology;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final HashMap<String, HashMap<String, Fields>> outputStreams = new HashMap<>();
    private final HashMap<String, FlinkOutputFieldsDeclarer> declarers = new HashMap<>();
    private final HashMap<String, Set<Map.Entry<GlobalStreamId, Grouping>>> unprocessdInputsPerBolt = new HashMap<>();
    final HashMap<String, HashMap<String, DataStream<Tuple>>> availableInputs = new HashMap<>();
    private final Map<String, IRichSpout> spouts = getPrivateField("_spouts");
    private final Map<String, IRichBolt> bolts = getPrivateField("_bolts");
    private final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    private FlinkTopology(TopologyBuilder topologyBuilder) {
        this.builder = topologyBuilder;
        this.stormTopology = topologyBuilder.createTopology();
        translateTopology();
    }

    public static FlinkTopology createTopology(TopologyBuilder topologyBuilder) {
        return new FlinkTopology(topologyBuilder);
    }

    public StreamExecutionEnvironment getExecutionEnvironment() {
        return this.env;
    }

    public JobExecutionResult execute() throws Exception {
        return this.env.execute();
    }

    private <T> Map<String, T> getPrivateField(String str) {
        try {
            Field declaredField = this.builder.getClass().getDeclaredField(str);
            declaredField.setAccessible(true);
            return (Map) copyObject((Map) declaredField.get(this.builder));
        } catch (IllegalAccessException | NoSuchFieldException e) {
            throw new RuntimeException("Couldn't get " + str + " from TopologyBuilder", e);
        }
    }

    private <T> T copyObject(T t) {
        try {
            return (T) InstantiationUtil.deserializeObject(InstantiationUtil.serializeObject(t), getClass().getClassLoader());
        } catch (IOException | ClassNotFoundException e) {
            throw new RuntimeException("Failed to copy object.");
        }
    }

    private void translateTopology() {
        DataStreamSource dataStreamSource;
        this.unprocessdInputsPerBolt.clear();
        this.outputStreams.clear();
        this.declarers.clear();
        this.availableInputs.clear();
        this.env.setParallelism(1);
        for (Map.Entry<String, IRichSpout> entry : this.spouts.entrySet()) {
            String key = entry.getKey();
            IRichSpout value = entry.getValue();
            FlinkOutputFieldsDeclarer flinkOutputFieldsDeclarer = new FlinkOutputFieldsDeclarer();
            value.declareOutputFields(flinkOutputFieldsDeclarer);
            HashMap<String, Fields> hashMap = flinkOutputFieldsDeclarer.outputStreams;
            this.outputStreams.put(key, hashMap);
            this.declarers.put(key, flinkOutputFieldsDeclarer);
            HashMap<String, DataStream<Tuple>> hashMap2 = new HashMap<>();
            if (hashMap.size() == 1) {
                SpoutWrapper spoutWrapper = new SpoutWrapper(value, key, null, null);
                spoutWrapper.setStormTopology(this.stormTopology);
                String str = (String) hashMap.keySet().toArray()[0];
                DataStreamSource addSource = this.env.addSource(spoutWrapper, key, flinkOutputFieldsDeclarer.getOutputType(str));
                hashMap2.put(str, addSource);
                dataStreamSource = addSource;
            } else {
                SpoutWrapper spoutWrapper2 = new SpoutWrapper(value, key, null, null);
                spoutWrapper2.setStormTopology(this.stormTopology);
                DataStreamSource addSource2 = this.env.addSource(spoutWrapper2, key, TypeExtractor.getForClass(SplitStreamType.class));
                SplitStream split = addSource2.split(new StormStreamSelector());
                for (String str2 : hashMap.keySet()) {
                    DataStream<Tuple> map = split.select(new String[]{str2}).map(new SplitStreamMapper());
                    map.getTransformation().setOutputType(flinkOutputFieldsDeclarer.getOutputType(str2));
                    hashMap2.put(str2, map);
                }
                dataStreamSource = addSource2;
            }
            this.availableInputs.put(key, hashMap2);
            ComponentCommon componentCommon = this.stormTopology.get_spouts().get(key).get_common();
            if (componentCommon.is_set_parallelism_hint()) {
                dataStreamSource.setParallelism(componentCommon.get_parallelism_hint());
            } else {
                componentCommon.set_parallelism_hint(1);
            }
        }
        boolean z = true;
        while (this.bolts.size() > 0) {
            if (!z) {
                StringBuilder sb = new StringBuilder();
                sb.append("Unable to build Topology. Could not connect the following bolts:");
                for (String str3 : this.bolts.keySet()) {
                    sb.append("\n  ");
                    sb.append(str3);
                    sb.append(": missing input streams [");
                    for (Map.Entry<GlobalStreamId, Grouping> entry2 : this.unprocessdInputsPerBolt.get(str3)) {
                        sb.append("'");
                        sb.append(entry2.getKey().get_streamId());
                        sb.append("' from '");
                        sb.append(entry2.getKey().get_componentId());
                        sb.append("'; ");
                    }
                    sb.append("]");
                }
                throw new RuntimeException(sb.toString());
            }
            z = false;
            Iterator<Map.Entry<String, IRichBolt>> it = this.bolts.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<String, IRichBolt> next = it.next();
                String key2 = next.getKey();
                IRichBolt iRichBolt = (IRichBolt) copyObject(next.getValue());
                ComponentCommon componentCommon2 = this.stormTopology.get_bolts().get(key2).get_common();
                Set<Map.Entry<GlobalStreamId, Grouping>> set = this.unprocessdInputsPerBolt.get(key2);
                if (set == null) {
                    set = new HashSet();
                    set.addAll(componentCommon2.get_inputs().entrySet());
                    this.unprocessdInputsPerBolt.put(key2, set);
                }
                int size = set.size();
                int i = 0;
                for (Map.Entry<GlobalStreamId, Grouping> entry3 : set) {
                    String str4 = entry3.getKey().get_componentId();
                    String str5 = entry3.getKey().get_streamId();
                    HashMap<String, DataStream<Tuple>> hashMap3 = this.availableInputs.get(str4);
                    if (hashMap3 != null && hashMap3.get(str5) != null) {
                        i++;
                    }
                }
                if (i == size) {
                    z = true;
                    it.remove();
                    HashMap hashMap4 = new HashMap(size);
                    for (Map.Entry<GlobalStreamId, Grouping> entry4 : set) {
                        GlobalStreamId key3 = entry4.getKey();
                        hashMap4.put(key3, processInput(key2, iRichBolt, key3, entry4.getValue(), this.availableInputs.get(key3.get_componentId())));
                    }
                    SingleOutputStreamOperator<?> createOutput = createOutput(key2, iRichBolt, hashMap4);
                    if (componentCommon2.is_set_parallelism_hint()) {
                        createOutput.setParallelism(componentCommon2.get_parallelism_hint());
                    } else {
                        componentCommon2.set_parallelism_hint(1);
                    }
                }
            }
        }
    }

    private DataStream<Tuple> processInput(String str, IRichBolt iRichBolt, GlobalStreamId globalStreamId, Grouping grouping, Map<String, DataStream<Tuple>> map) {
        if (!$assertionsDisabled && iRichBolt == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && str == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && globalStreamId == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && grouping == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && map == null) {
            throw new AssertionError();
        }
        String str2 = globalStreamId.get_componentId();
        String str3 = globalStreamId.get_streamId();
        DataStream<Tuple> dataStream = map.get(str3);
        FlinkOutputFieldsDeclarer flinkOutputFieldsDeclarer = new FlinkOutputFieldsDeclarer();
        this.declarers.put(str, flinkOutputFieldsDeclarer);
        iRichBolt.declareOutputFields(flinkOutputFieldsDeclarer);
        this.outputStreams.put(str, flinkOutputFieldsDeclarer.outputStreams);
        if (grouping.is_set_shuffle()) {
            dataStream = dataStream.rebalance();
        } else if (grouping.is_set_fields()) {
            dataStream = grouping.get_fields().size() > 0 ? dataStream.keyBy(this.declarers.get(str2).getGroupingFieldIndexes(str3, grouping.get_fields())) : dataStream.global();
        } else if (grouping.is_set_all()) {
            dataStream = dataStream.broadcast();
        } else if (!grouping.is_set_local_or_shuffle()) {
            throw new UnsupportedOperationException("Flink only supports (local-or-)shuffle, fields, all, and global grouping");
        }
        return dataStream;
    }

    private SingleOutputStreamOperator<?> createOutput(String str, IRichBolt iRichBolt, Map<GlobalStreamId, DataStream<Tuple>> map) {
        SingleOutputStreamOperator singleOutputStreamOperator;
        SingleOutputStreamOperator transform;
        SingleOutputStreamOperator singleOutputStreamOperator2;
        SingleOutputStreamOperator transform2;
        if (!$assertionsDisabled && str == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && iRichBolt == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && map == null) {
            throw new AssertionError();
        }
        Iterator<Map.Entry<GlobalStreamId, DataStream<Tuple>>> it = map.entrySet().iterator();
        Map.Entry<GlobalStreamId, DataStream<Tuple>> next = it.next();
        GlobalStreamId key = next.getKey();
        String str2 = key.get_streamId();
        String str3 = key.get_componentId();
        Fields fields = this.outputStreams.get(str3).get(str2);
        DataStream<Tuple> value = next.getValue();
        SingleOutputStreamOperator singleOutputStreamOperator3 = null;
        while (true) {
            singleOutputStreamOperator = singleOutputStreamOperator3;
            if (!it.hasNext()) {
                break;
            }
            Map.Entry<GlobalStreamId, DataStream<Tuple>> next2 = it.next();
            GlobalStreamId key2 = next2.getKey();
            DataStream<Tuple> value2 = next2.getValue();
            singleOutputStreamOperator3 = singleOutputStreamOperator == null ? value.connect(value2).flatMap(new TwoFlinkStreamsMerger(key, fields, key2, this.outputStreams.get(key2.get_componentId()).get(key2.get_streamId()))).returns(StormTuple.class) : singleOutputStreamOperator.connect(value2).flatMap(new StormFlinkStreamMerger(key2, this.outputStreams.get(key2.get_componentId()).get(key2.get_streamId()))).returns(StormTuple.class);
        }
        HashMap<String, Fields> hashMap = this.outputStreams.get(str);
        FlinkOutputFieldsDeclarer flinkOutputFieldsDeclarer = this.declarers.get(str);
        if (hashMap.size() < 2) {
            String str4 = hashMap.size() == 1 ? (String) hashMap.keySet().toArray()[0] : null;
            TypeInformation<Tuple> outputType = flinkOutputFieldsDeclarer.getOutputType(str4);
            if (map.entrySet().size() == 1) {
                BoltWrapper boltWrapper = new BoltWrapper(iRichBolt, str, str2, str3, fields, null);
                boltWrapper.setStormTopology(this.stormTopology);
                transform2 = value.transform(str, outputType, boltWrapper);
            } else {
                MergedInputsBoltWrapper mergedInputsBoltWrapper = new MergedInputsBoltWrapper(iRichBolt, str, null);
                mergedInputsBoltWrapper.setStormTopology(this.stormTopology);
                transform2 = singleOutputStreamOperator.transform(str, outputType, mergedInputsBoltWrapper);
            }
            if (outputType != null) {
                HashMap<String, DataStream<Tuple>> hashMap2 = new HashMap<>();
                hashMap2.put(str4, transform2);
                this.availableInputs.put(str, hashMap2);
            }
            singleOutputStreamOperator2 = transform2;
        } else {
            TypeInformation forClass = TypeExtractor.getForClass(SplitStreamType.class);
            if (map.entrySet().size() == 1) {
                BoltWrapper boltWrapper2 = new BoltWrapper(iRichBolt, str, str2, str3, fields, null);
                boltWrapper2.setStormTopology(this.stormTopology);
                transform = value.transform(str, forClass, boltWrapper2);
            } else {
                MergedInputsBoltWrapper mergedInputsBoltWrapper2 = new MergedInputsBoltWrapper(iRichBolt, str, null);
                mergedInputsBoltWrapper2.setStormTopology(this.stormTopology);
                transform = singleOutputStreamOperator.transform(str, forClass, mergedInputsBoltWrapper2);
            }
            SplitStream split = transform.split(new StormStreamSelector());
            HashMap<String, DataStream<Tuple>> hashMap3 = new HashMap<>();
            for (String str5 : hashMap.keySet()) {
                hashMap3.put(str5, split.select(new String[]{str5}).map(new SplitStreamMapper()));
                SingleOutputStreamOperator map2 = split.select(new String[]{str5}).map(new SplitStreamMapper());
                map2.getTransformation().setOutputType(flinkOutputFieldsDeclarer.getOutputType(str5));
                hashMap3.put(str5, map2);
            }
            this.availableInputs.put(str, hashMap3);
            singleOutputStreamOperator2 = transform;
        }
        return singleOutputStreamOperator2;
    }

    public StormTopology getStormTopology() {
        return this.stormTopology;
    }

    static {
        $assertionsDisabled = !FlinkTopology.class.desiredAssertionStatus();
    }
}
