/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.storm.wrappers;

import backtype.storm.generated.GlobalStreamId;
import backtype.storm.generated.Grouping;
import backtype.storm.generated.StormTopology;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.MessageId;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.shaded.com.google.common.collect.Sets;
import org.apache.flink.storm.util.StormConfig;
import org.apache.flink.storm.wrappers.BoltCollector;
import org.apache.flink.storm.wrappers.StormTuple;
import org.apache.flink.storm.wrappers.WrapperSetupHelper;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

public class BoltWrapper<IN, OUT>
extends AbstractStreamOperator<OUT>
implements OneInputStreamOperator<IN, OUT> {
    private static final long serialVersionUID = -4788589118464155835L;
    public static final String DEFAULT_ID = "default ID";
    public static final String DEFUALT_BOLT_NAME = "Unnamed Bolt";
    protected final IRichBolt bolt;
    private final String name;
    private final HashMap<String, Integer> numberOfAttributes;
    private StormTopology stormTopology;
    private transient TopologyContext topologyContext;
    private final HashMap<Integer, String> inputStreamIds = new HashMap();
    private final HashMap<Integer, String> inputComponentIds = new HashMap();
    private final HashMap<Integer, Fields> inputSchemas = new HashMap();
    protected transient TimestampedCollector<OUT> flinkCollector;

    public BoltWrapper(IRichBolt bolt) throws IllegalArgumentException {
        this(bolt, null, (Collection<String>)null);
    }

    public BoltWrapper(IRichBolt bolt, Fields inputSchema) throws IllegalArgumentException {
        this(bolt, inputSchema, (Collection<String>)null);
    }

    public BoltWrapper(IRichBolt bolt, String[] rawOutputs) throws IllegalArgumentException {
        this(bolt, null, Sets.newHashSet((Object[])rawOutputs));
    }

    public BoltWrapper(IRichBolt bolt, Collection<String> rawOutputs) throws IllegalArgumentException {
        this(bolt, null, rawOutputs);
    }

    public BoltWrapper(IRichBolt bolt, Fields inputSchema, String[] rawOutputs) throws IllegalArgumentException {
        this(bolt, inputSchema, Sets.newHashSet((Object[])rawOutputs));
    }

    public BoltWrapper(IRichBolt bolt, Fields inputSchema, Collection<String> rawOutputs) throws IllegalArgumentException {
        this(bolt, DEFUALT_BOLT_NAME, "default", DEFAULT_ID, inputSchema, rawOutputs);
    }

    public BoltWrapper(IRichBolt bolt, String name, String inputStreamId, String inputComponentId, Fields inputSchema, Collection<String> rawOutputs) throws IllegalArgumentException {
        this.bolt = bolt;
        this.name = name;
        this.inputSchemas.put(null, inputSchema);
        this.numberOfAttributes = WrapperSetupHelper.getNumberOfAttributes(bolt, rawOutputs);
    }

    public void setStormTopology(StormTopology stormTopology) {
        this.stormTopology = stormTopology;
    }

    public void open() throws Exception {
        super.open();
        this.flinkCollector = new TimestampedCollector(this.output);
        ExecutionConfig.GlobalJobParameters config = this.getExecutionConfig().getGlobalJobParameters();
        StormConfig stormConfig = new StormConfig();
        if (config != null) {
            if (config instanceof StormConfig) {
                stormConfig = (StormConfig)config;
            } else {
                stormConfig.putAll(config.toMap());
            }
        }
        this.topologyContext = WrapperSetupHelper.createTopologyContext(this.getRuntimeContext(), this.bolt, this.name, this.stormTopology, stormConfig);
        OutputCollector stormCollector = new OutputCollector(new BoltCollector<OUT>(this.numberOfAttributes, this.topologyContext.getThisTaskId(), this.flinkCollector));
        if (this.stormTopology != null) {
            Map<GlobalStreamId, Grouping> inputs = this.topologyContext.getThisSources();
            for (GlobalStreamId inputStream : inputs.keySet()) {
                for (Integer tid : this.topologyContext.getComponentTasks(inputStream.get_componentId())) {
                    this.inputComponentIds.put(tid, inputStream.get_componentId());
                    this.inputStreamIds.put(tid, inputStream.get_streamId());
                    this.inputSchemas.put(tid, this.topologyContext.getComponentOutputFields(inputStream));
                }
            }
        }
        this.bolt.prepare(stormConfig, this.topologyContext, stormCollector);
    }

    public void dispose() {
        this.bolt.cleanup();
    }

    public void processElement(StreamRecord<IN> element) throws Exception {
        this.flinkCollector.setTimestamp(element);
        Object value = element.getValue();
        if (this.stormTopology != null) {
            Tuple tuple = (Tuple)value;
            Integer producerTaskId = (Integer)tuple.getField(tuple.getArity() - 1);
            this.bolt.execute(new StormTuple<Object>(value, this.inputSchemas.get(producerTaskId), producerTaskId, this.inputStreamIds.get(producerTaskId), this.inputComponentIds.get(producerTaskId), MessageId.makeUnanchored()));
        } else {
            this.bolt.execute(new StormTuple<Object>(value, this.inputSchemas.get(null), -1, null, null, MessageId.makeUnanchored()));
        }
    }

    public void processWatermark(Watermark mark) throws Exception {
        this.output.emitWatermark(mark);
    }
}

