/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.storm.api;

import backtype.storm.generated.ComponentCommon;
import backtype.storm.generated.GlobalStreamId;
import backtype.storm.generated.Grouping;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.storm.api.FlinkOutputFieldsDeclarer;
import org.apache.flink.storm.api.StormFlinkStreamMerger;
import org.apache.flink.storm.api.TwoFlinkStreamsMerger;
import org.apache.flink.storm.util.SplitStreamMapper;
import org.apache.flink.storm.util.SplitStreamType;
import org.apache.flink.storm.util.StormStreamSelector;
import org.apache.flink.storm.wrappers.BoltWrapper;
import org.apache.flink.storm.wrappers.MergedInputsBoltWrapper;
import org.apache.flink.storm.wrappers.SpoutWrapper;
import org.apache.flink.storm.wrappers.StormTuple;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.util.InstantiationUtil;

public class FlinkTopology {
    private final HashMap<String, HashMap<String, Fields>> outputStreams = new HashMap();
    private final HashMap<String, FlinkOutputFieldsDeclarer> declarers = new HashMap();
    private final HashMap<String, Set<Map.Entry<GlobalStreamId, Grouping>>> unprocessdInputsPerBolt = new HashMap();
    final HashMap<String, HashMap<String, DataStream<Tuple>>> availableInputs = new HashMap();
    private final TopologyBuilder builder;
    private final StormTopology stormTopology;
    private final Map<String, IRichSpout> spouts;
    private final Map<String, IRichBolt> bolts;
    private final StreamExecutionEnvironment env;

    private FlinkTopology(TopologyBuilder builder) {
        this.builder = builder;
        this.stormTopology = builder.createTopology();
        this.spouts = this.getPrivateField("_spouts");
        this.bolts = this.getPrivateField("_bolts");
        this.env = StreamExecutionEnvironment.getExecutionEnvironment();
        this.translateTopology();
    }

    public static FlinkTopology createTopology(TopologyBuilder stormBuilder) {
        return new FlinkTopology(stormBuilder);
    }

    public StreamExecutionEnvironment getExecutionEnvironment() {
        return this.env;
    }

    public JobExecutionResult execute() throws Exception {
        return this.env.execute();
    }

    private <T> Map<String, T> getPrivateField(String field) {
        try {
            Field f = this.builder.getClass().getDeclaredField(field);
            f.setAccessible(true);
            return this.copyObject((Map)f.get(this.builder));
        }
        catch (IllegalAccessException | NoSuchFieldException e) {
            throw new RuntimeException("Couldn't get " + field + " from TopologyBuilder", e);
        }
    }

    private <T> T copyObject(T object) {
        try {
            return (T)InstantiationUtil.deserializeObject((byte[])InstantiationUtil.serializeObject(object), (ClassLoader)this.getClass().getClassLoader());
        }
        catch (IOException | ClassNotFoundException e) {
            throw new RuntimeException("Failed to copy object.");
        }
    }

    private void translateTopology() {
        this.unprocessdInputsPerBolt.clear();
        this.outputStreams.clear();
        this.declarers.clear();
        this.availableInputs.clear();
        this.env.setParallelism(1);
        for (Map.Entry<String, IRichSpout> spout : this.spouts.entrySet()) {
            DataStreamSource source;
            String spoutId = spout.getKey();
            IRichSpout userSpout = spout.getValue();
            FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
            userSpout.declareOutputFields(declarer);
            HashMap<String, Fields> hashMap = declarer.outputStreams;
            this.outputStreams.put(spoutId, hashMap);
            this.declarers.put(spoutId, declarer);
            HashMap<String, Object> outputStreams = new HashMap<String, Object>();
            if (hashMap.size() == 1) {
                SpoutWrapper spoutWrapperSingleOutput = new SpoutWrapper(userSpout, spoutId, null, null);
                spoutWrapperSingleOutput.setStormTopology(this.stormTopology);
                String outputStreamId = (String)hashMap.keySet().toArray()[0];
                DataStreamSource dataStreamSource = this.env.addSource(spoutWrapperSingleOutput, spoutId, declarer.getOutputType(outputStreamId));
                outputStreams.put(outputStreamId, dataStreamSource);
                source = dataStreamSource;
            } else {
                SpoutWrapper spoutWrapperMultipleOutputs = new SpoutWrapper(userSpout, spoutId, null, null);
                spoutWrapperMultipleOutputs.setStormTopology(this.stormTopology);
                DataStreamSource multiSource = this.env.addSource(spoutWrapperMultipleOutputs, spoutId, TypeExtractor.getForClass(SplitStreamType.class));
                SplitStream splitStream = multiSource.split(new StormStreamSelector());
                for (String string : hashMap.keySet()) {
                    SingleOutputStreamOperator outStream = splitStream.select(new String[]{string}).map(new SplitStreamMapper());
                    outStream.getTransformation().setOutputType(declarer.getOutputType(string));
                    outputStreams.put(string, outStream);
                }
                source = multiSource;
            }
            this.availableInputs.put(spoutId, outputStreams);
            ComponentCommon common = this.stormTopology.get_spouts().get(spoutId).get_common();
            if (common.is_set_parallelism_hint()) {
                int dop = common.get_parallelism_hint();
                source.setParallelism(dop);
                continue;
            }
            common.set_parallelism_hint(1);
        }
        boolean makeProgress = true;
        while (this.bolts.size() > 0) {
            if (!makeProgress) {
                StringBuilder strBld = new StringBuilder();
                strBld.append("Unable to build Topology. Could not connect the following bolts:");
                for (String boltId : this.bolts.keySet()) {
                    strBld.append("\n  ");
                    strBld.append(boltId);
                    strBld.append(": missing input streams [");
                    for (Map.Entry entry : this.unprocessdInputsPerBolt.get(boltId)) {
                        strBld.append("'");
                        strBld.append(((GlobalStreamId)entry.getKey()).get_streamId());
                        strBld.append("' from '");
                        strBld.append(((GlobalStreamId)entry.getKey()).get_componentId());
                        strBld.append("'; ");
                    }
                    strBld.append("]");
                }
                throw new RuntimeException(strBld.toString());
            }
            makeProgress = false;
            Iterator<Map.Entry<String, IRichBolt>> boltsIterator = this.bolts.entrySet().iterator();
            while (boltsIterator.hasNext()) {
                String boltId;
                Map.Entry<String, IRichBolt> bolt = boltsIterator.next();
                boltId = bolt.getKey();
                IRichBolt userBolt = this.copyObject(bolt.getValue());
                ComponentCommon componentCommon = this.stormTopology.get_bolts().get(boltId).get_common();
                Set<Map.Entry<GlobalStreamId, Grouping>> unprocessedBoltInputs = this.unprocessdInputsPerBolt.get(boltId);
                if (unprocessedBoltInputs == null) {
                    unprocessedBoltInputs = new HashSet<Map.Entry<GlobalStreamId, Grouping>>();
                    unprocessedBoltInputs.addAll(componentCommon.get_inputs().entrySet());
                    this.unprocessdInputsPerBolt.put(boltId, unprocessedBoltInputs);
                }
                int numberOfInputs = unprocessedBoltInputs.size();
                int inputsAvailable = 0;
                for (Map.Entry<GlobalStreamId, Grouping> entry : unprocessedBoltInputs) {
                    String producerId = entry.getKey().get_componentId();
                    String string = entry.getKey().get_streamId();
                    HashMap<String, DataStream<Tuple>> streams = this.availableInputs.get(producerId);
                    if (streams == null || streams.get(string) == null) continue;
                    ++inputsAvailable;
                }
                if (inputsAvailable != numberOfInputs) continue;
                makeProgress = true;
                boltsIterator.remove();
                HashMap<GlobalStreamId, DataStream<Tuple>> inputStreams = new HashMap<GlobalStreamId, DataStream<Tuple>>(numberOfInputs);
                for (Map.Entry<GlobalStreamId, Grouping> input : unprocessedBoltInputs) {
                    GlobalStreamId globalStreamId = input.getKey();
                    Grouping grouping = input.getValue();
                    String producerId = globalStreamId.get_componentId();
                    Map producer = this.availableInputs.get(producerId);
                    inputStreams.put(globalStreamId, this.processInput(boltId, userBolt, globalStreamId, grouping, producer));
                }
                SingleOutputStreamOperator<?> singleOutputStreamOperator = this.createOutput(boltId, userBolt, inputStreams);
                if (componentCommon.is_set_parallelism_hint()) {
                    int dop = componentCommon.get_parallelism_hint();
                    singleOutputStreamOperator.setParallelism(dop);
                    continue;
                }
                componentCommon.set_parallelism_hint(1);
            }
        }
    }

    private DataStream<Tuple> processInput(String boltId, IRichBolt userBolt, GlobalStreamId streamId, Grouping grouping, Map<String, DataStream<Tuple>> producer) {
        assert (userBolt != null);
        assert (boltId != null);
        assert (streamId != null);
        assert (grouping != null);
        assert (producer != null);
        String producerId = streamId.get_componentId();
        String inputStreamId = streamId.get_streamId();
        DataStream inputStream = producer.get(inputStreamId);
        FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
        this.declarers.put(boltId, declarer);
        userBolt.declareOutputFields(declarer);
        this.outputStreams.put(boltId, declarer.outputStreams);
        if (grouping.is_set_shuffle()) {
            inputStream = inputStream.rebalance();
        } else if (grouping.is_set_fields()) {
            List<String> fields = grouping.get_fields();
            if (fields.size() > 0) {
                FlinkOutputFieldsDeclarer prodDeclarer = this.declarers.get(producerId);
                inputStream = inputStream.keyBy(prodDeclarer.getGroupingFieldIndexes(inputStreamId, grouping.get_fields()));
            } else {
                inputStream = inputStream.global();
            }
        } else if (grouping.is_set_all()) {
            inputStream = inputStream.broadcast();
        } else if (!grouping.is_set_local_or_shuffle()) {
            throw new UnsupportedOperationException("Flink only supports (local-or-)shuffle, fields, all, and global grouping");
        }
        return inputStream;
    }

    private SingleOutputStreamOperator<?> createOutput(String boltId, IRichBolt bolt, Map<GlobalStreamId, DataStream<Tuple>> inputStreams) {
        SingleOutputStreamOperator outputStream;
        assert (boltId != null);
        assert (bolt != null);
        assert (inputStreams != null);
        Iterator<Map.Entry<GlobalStreamId, DataStream<Tuple>>> iterator = inputStreams.entrySet().iterator();
        Map.Entry<GlobalStreamId, DataStream<Tuple>> input1 = iterator.next();
        GlobalStreamId streamId1 = input1.getKey();
        String inputStreamId1 = streamId1.get_streamId();
        String inputComponentId1 = streamId1.get_componentId();
        Fields inputSchema1 = this.outputStreams.get(inputComponentId1).get(inputStreamId1);
        DataStream<Tuple> singleInputStream = input1.getValue();
        SingleOutputStreamOperator mergedInputStream = null;
        while (iterator.hasNext()) {
            Map.Entry<GlobalStreamId, DataStream<Tuple>> input2 = iterator.next();
            GlobalStreamId streamId2 = input2.getKey();
            DataStream<Tuple> inputStream2 = input2.getValue();
            if (mergedInputStream == null) {
                mergedInputStream = singleInputStream.connect(inputStream2).flatMap(new TwoFlinkStreamsMerger(streamId1, inputSchema1, streamId2, this.outputStreams.get(streamId2.get_componentId()).get(streamId2.get_streamId()))).returns(StormTuple.class);
                continue;
            }
            mergedInputStream = mergedInputStream.connect(inputStream2).flatMap(new StormFlinkStreamMerger(streamId2, this.outputStreams.get(streamId2.get_componentId()).get(streamId2.get_streamId()))).returns(StormTuple.class);
        }
        HashMap<String, Fields> boltOutputs = this.outputStreams.get(boltId);
        FlinkOutputFieldsDeclarer declarer = this.declarers.get(boltId);
        if (boltOutputs.size() < 2) {
            SingleOutputStreamOperator outStream;
            BoltWrapper boltWrapper;
            String outputStreamId = boltOutputs.size() == 1 ? (String)boltOutputs.keySet().toArray()[0] : null;
            TypeInformation<Tuple> outType = declarer.getOutputType(outputStreamId);
            if (inputStreams.entrySet().size() == 1) {
                boltWrapper = new BoltWrapper(bolt, boltId, inputStreamId1, inputComponentId1, inputSchema1, null);
                boltWrapper.setStormTopology(this.stormTopology);
                outStream = singleInputStream.transform(boltId, outType, (OneInputStreamOperator)boltWrapper);
            } else {
                boltWrapper = new MergedInputsBoltWrapper(bolt, boltId, null);
                boltWrapper.setStormTopology(this.stormTopology);
                outStream = mergedInputStream.transform(boltId, outType, (OneInputStreamOperator)boltWrapper);
            }
            if (outType != null) {
                HashMap<String, SingleOutputStreamOperator> op = new HashMap<String, SingleOutputStreamOperator>();
                op.put(outputStreamId, outStream);
                this.availableInputs.put(boltId, op);
            }
            outputStream = outStream;
        } else {
            SingleOutputStreamOperator multiStream;
            BoltWrapper boltWrapperMultipleOutputs;
            TypeInformation outType = TypeExtractor.getForClass(SplitStreamType.class);
            if (inputStreams.entrySet().size() == 1) {
                boltWrapperMultipleOutputs = new BoltWrapper(bolt, boltId, inputStreamId1, inputComponentId1, inputSchema1, null);
                boltWrapperMultipleOutputs.setStormTopology(this.stormTopology);
                multiStream = singleInputStream.transform(boltId, outType, (OneInputStreamOperator)boltWrapperMultipleOutputs);
            } else {
                boltWrapperMultipleOutputs = new MergedInputsBoltWrapper(bolt, boltId, null);
                boltWrapperMultipleOutputs.setStormTopology(this.stormTopology);
                multiStream = mergedInputStream.transform(boltId, outType, (OneInputStreamOperator)boltWrapperMultipleOutputs);
            }
            SplitStream splitStream = multiStream.split(new StormStreamSelector());
            HashMap<String, SingleOutputStreamOperator> op = new HashMap<String, SingleOutputStreamOperator>();
            for (String outputStreamId : boltOutputs.keySet()) {
                op.put(outputStreamId, splitStream.select(new String[]{outputStreamId}).map(new SplitStreamMapper()));
                SingleOutputStreamOperator outStream = splitStream.select(new String[]{outputStreamId}).map(new SplitStreamMapper());
                outStream.getTransformation().setOutputType(declarer.getOutputType(outputStreamId));
                op.put(outputStreamId, outStream);
            }
            this.availableInputs.put(boltId, op);
            outputStream = multiStream;
        }
        return outputStream;
    }

    public StormTopology getStormTopology() {
        return this.stormTopology;
    }
}

