/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.storm.wrappers;

import backtype.storm.generated.StormTopology;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import java.util.Collection;
import java.util.HashMap;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.shaded.com.google.common.collect.Sets;
import org.apache.flink.storm.util.FiniteSpout;
import org.apache.flink.storm.util.StormConfig;
import org.apache.flink.storm.wrappers.SpoutCollector;
import org.apache.flink.storm.wrappers.WrapperSetupHelper;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;

public final class SpoutWrapper<OUT>
extends RichParallelSourceFunction<OUT>
implements StoppableFunction {
    private static final long serialVersionUID = -218340336648247605L;
    private final HashMap<String, Integer> numberOfAttributes;
    private final IRichSpout spout;
    private final String name;
    private volatile boolean isRunning = true;
    private Integer numberOfInvocations;
    private StormTopology stormTopology;

    public SpoutWrapper(IRichSpout spout) throws IllegalArgumentException {
        this(spout, (Collection<String>)null, null);
    }

    public SpoutWrapper(IRichSpout spout, Integer numberOfInvocations) throws IllegalArgumentException {
        this(spout, (Collection<String>)null, numberOfInvocations);
    }

    public SpoutWrapper(IRichSpout spout, String[] rawOutputs) throws IllegalArgumentException {
        this(spout, Sets.newHashSet((Object[])rawOutputs), null);
    }

    public SpoutWrapper(IRichSpout spout, String[] rawOutputs, Integer numberOfInvocations) throws IllegalArgumentException {
        this(spout, Sets.newHashSet((Object[])rawOutputs), numberOfInvocations);
    }

    public SpoutWrapper(IRichSpout spout, Collection<String> rawOutputs) throws IllegalArgumentException {
        this(spout, rawOutputs, null);
    }

    public SpoutWrapper(IRichSpout spout, Collection<String> rawOutputs, Integer numberOfInvocations) throws IllegalArgumentException {
        this(spout, null, rawOutputs, numberOfInvocations);
    }

    public SpoutWrapper(IRichSpout spout, String name, Collection<String> rawOutputs, Integer numberOfInvocations) throws IllegalArgumentException {
        this.spout = spout;
        this.name = name;
        this.numberOfAttributes = WrapperSetupHelper.getNumberOfAttributes(spout, rawOutputs);
        this.numberOfInvocations = numberOfInvocations;
    }

    public void setStormTopology(StormTopology stormTopology) {
        this.stormTopology = stormTopology;
    }

    public final void run(SourceFunction.SourceContext<OUT> ctx) throws Exception {
        ExecutionConfig.GlobalJobParameters config = super.getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
        StormConfig stormConfig = new StormConfig();
        if (config != null) {
            if (config instanceof StormConfig) {
                stormConfig = (StormConfig)config;
            } else {
                stormConfig.putAll(config.toMap());
            }
        }
        TopologyContext stormTopologyContext = WrapperSetupHelper.createTopologyContext((StreamingRuntimeContext)super.getRuntimeContext(), this.spout, this.name, this.stormTopology, stormConfig);
        SpoutCollector<OUT> collector = new SpoutCollector<OUT>(this.numberOfAttributes, stormTopologyContext.getThisTaskId(), ctx);
        this.spout.open(stormConfig, stormTopologyContext, new SpoutOutputCollector(collector));
        this.spout.activate();
        if (this.numberOfInvocations == null) {
            if (this.spout instanceof FiniteSpout) {
                FiniteSpout finiteSpout = (FiniteSpout)this.spout;
                while (this.isRunning && !finiteSpout.reachedEnd()) {
                    finiteSpout.nextTuple();
                }
            } else {
                while (this.isRunning) {
                    this.spout.nextTuple();
                }
            }
        } else {
            int counter = this.numberOfInvocations;
            if (counter >= 0) {
                while (--counter >= 0 && this.isRunning) {
                    this.spout.nextTuple();
                }
            } else {
                do {
                    collector.tupleEmitted = false;
                    this.spout.nextTuple();
                } while (collector.tupleEmitted && this.isRunning);
            }
        }
    }

    public void cancel() {
        this.isRunning = false;
    }

    public void stop() {
        this.isRunning = false;
    }

    public void close() throws Exception {
        this.spout.close();
    }
}

