package org.apache.flink.storm.split;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.storm.split.operators.RandomSpout;
import org.apache.flink.storm.split.operators.VerifyAndEnrichBolt;
import org.apache.flink.storm.util.SplitStreamMapper;
import org.apache.flink.storm.util.SplitStreamType;
import org.apache.flink.storm.util.StormStreamSelector;
import org.apache.flink.storm.wrappers.BoltWrapper;
import org.apache.flink.storm.wrappers.SpoutWrapper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.storm.topology.IRichSpout;

/* loaded from: input_file:org/apache/flink/storm/split/SpoutSplitExample.class */
public class SpoutSplitExample {
    private static long seed = System.currentTimeMillis();
    private static String outputPath = null;

    /* loaded from: input_file:org/apache/flink/storm/split/SpoutSplitExample$Enrich.class */
    public static final class Enrich implements MapFunction<Integer, Tuple2<String, Integer>> {
        private static final long serialVersionUID = 5213888269197438892L;
        private final Tuple2<String, Integer> out;
        private final boolean isEven;
        public static boolean errorOccured = false;

        public Enrich(boolean z) {
            this.isEven = z;
            if (z) {
                this.out = new Tuple2<>(RandomSpout.EVEN_STREAM, 0);
            } else {
                this.out = new Tuple2<>(RandomSpout.ODD_STREAM, 0);
            }
        }

        public Tuple2<String, Integer> map(Integer num) throws Exception {
            if ((num.intValue() % 2 == 0) != this.isEven) {
                errorOccured = true;
            }
            this.out.setField(num, 1);
            return this.out;
        }
    }

    public static void main(String[] strArr) throws Exception {
        boolean parseParameters = parseParameters(strArr);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        SplitStream split = executionEnvironment.addSource(new SpoutWrapper((IRichSpout) new RandomSpout(true, seed), new String[]{RandomSpout.EVEN_STREAM, RandomSpout.ODD_STREAM}, (Integer) 1000), TypeExtractor.getForObject(new SplitStreamType())).split(new StormStreamSelector());
        DataStream select = split.select(new String[]{RandomSpout.EVEN_STREAM});
        DataStream select2 = split.select(new String[]{RandomSpout.ODD_STREAM});
        SingleOutputStreamOperator map = select.map(new SplitStreamMapper()).returns(Integer.class).map(new Enrich(true));
        SingleOutputStreamOperator transform = select2.map(new SplitStreamMapper()).transform("oddBolt", TypeExtractor.getForObject(new Tuple2("", 0)), new BoltWrapper(new VerifyAndEnrichBolt(false)));
        if (parseParameters) {
            map.writeAsText(outputPath + "/even");
            transform.writeAsText(outputPath + "/odd");
        } else {
            map.print();
            transform.print();
        }
        executionEnvironment.execute("Spout split stream example");
    }

    static boolean parseParameters(String[] strArr) {
        if (strArr.length <= 0) {
            System.out.println("Executing SplitBoltTopology example with random data");
            System.out.println("  Usage: SplitStreamBoltLocal <seed> <result path>");
            return false;
        }
        if (strArr.length != 2) {
            throw new IllegalArgumentException("Usage: SplitStreamBoltLocal <seed> <result path>");
        }
        seed = Long.parseLong(strArr[0]);
        outputPath = strArr[1];
        return true;
    }
}
