package org.apache.flink.storm.join;

import backtype.storm.Config;
import backtype.storm.drpc.PrepareRequest;
import backtype.storm.testing.FeederSpout;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import org.apache.flink.storm.api.FlinkLocalCluster;
import org.apache.flink.storm.api.FlinkTopology;
import org.apache.flink.storm.util.BoltFileSink;
import org.apache.flink.storm.util.NullTerminatingSpout;
import org.apache.flink.storm.util.TupleOutputFormatter;
import storm.starter.bolt.PrinterBolt;
import storm.starter.bolt.SingleJoinBolt;

/* loaded from: input_file:org/apache/flink/storm/join/SingleJoinExample.class */
public class SingleJoinExample {
    public static void main(String[] strArr) throws Exception {
        FeederSpout feederSpout = new FeederSpout(new Fields(PrepareRequest.ID_STREAM, "gender", "hobbies"));
        FeederSpout feederSpout2 = new FeederSpout(new Fields(PrepareRequest.ID_STREAM, "age"));
        Config config = new Config();
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        config.put(FlinkLocalCluster.SUBMIT_BLOCKING, true);
        NullTerminatingSpout nullTerminatingSpout = new NullTerminatingSpout(feederSpout);
        NullTerminatingSpout nullTerminatingSpout2 = new NullTerminatingSpout(feederSpout2);
        topologyBuilder.setSpout("gender", nullTerminatingSpout);
        topologyBuilder.setSpout("age", nullTerminatingSpout2);
        topologyBuilder.setBolt("join", (IRichBolt) new SingleJoinBolt(new Fields("gender", "age"))).fieldsGrouping("gender", new Fields(PrepareRequest.ID_STREAM)).fieldsGrouping("age", new Fields(PrepareRequest.ID_STREAM));
        if (strArr.length > 0) {
            topologyBuilder.setBolt("fileOutput", new BoltFileSink(strArr[0], new TupleOutputFormatter())).shuffleGrouping("join");
        } else {
            topologyBuilder.setBolt("print", (IBasicBolt) new PrinterBolt()).shuffleGrouping("join");
        }
        String[] strArr2 = {"reading", "biking", "travelling", "watching tv"};
        for (int i = 0; i < 10; i++) {
            feederSpout.feed(new Values(Integer.valueOf(i), i % 2 == 0 ? "male" : "female", strArr2[i % strArr2.length]));
        }
        for (int i2 = 9; i2 >= 0; i2--) {
            feederSpout2.feed(new Values(Integer.valueOf(i2), Integer.valueOf(i2 + 20)));
        }
        FlinkLocalCluster localCluster = FlinkLocalCluster.getLocalCluster();
        localCluster.submitTopology("joinTopology", config, FlinkTopology.createTopology(topologyBuilder));
        localCluster.shutdown();
    }
}
