/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.storm.example;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.metric.api.IMetricsConsumer;
import backtype.storm.task.IErrorReporter;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.storm.MessageToValuesMapper;
import org.apache.pulsar.storm.PulsarBolt;
import org.apache.pulsar.storm.PulsarBoltConfiguration;
import org.apache.pulsar.storm.PulsarSpout;
import org.apache.pulsar.storm.PulsarSpoutConfiguration;
import org.apache.pulsar.storm.TupleToMessageMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StormExample {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarSpout.class);
    private static final String serviceUrl = "http://broker-pdev.messaging.corp.usw.example.com:8080";
    static MessageToValuesMapper messageToValuesMapper = new MessageToValuesMapper(){

        public Values toValues(Message msg) {
            return new Values(new Object[]{new String(msg.getData())});
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields(new String[]{"string"}));
        }
    };
    static TupleToMessageMapper tupleToMessageMapper = new TupleToMessageMapper(){

        public Message toMessage(Tuple tuple) {
            String receivedMessage = tuple.getString(0);
            String processedMsg = receivedMessage + "-processed";
            return MessageBuilder.create().setContent(processedMsg.getBytes()).build();
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
        }
    };

    public static void main(String[] args) throws PulsarClientException {
        ClientConfiguration clientConf = new ClientConfiguration();
        String topic1 = "persistent://my-property/use/my-ns/my-topic1";
        String topic2 = "persistent://my-property/use/my-ns/my-topic2";
        String subscriptionName1 = "my-subscriber-name1";
        String subscriptionName2 = "my-subscriber-name2";
        PulsarSpoutConfiguration spoutConf = new PulsarSpoutConfiguration();
        spoutConf.setServiceUrl(serviceUrl);
        spoutConf.setTopic(topic1);
        spoutConf.setSubscriptionName(subscriptionName1);
        spoutConf.setMessageToValuesMapper(messageToValuesMapper);
        PulsarSpout spout = new PulsarSpout(spoutConf, clientConf);
        PulsarBoltConfiguration boltConf = new PulsarBoltConfiguration();
        boltConf.setServiceUrl(serviceUrl);
        boltConf.setTopic(topic2);
        boltConf.setTupleToMessageMapper(tupleToMessageMapper);
        PulsarBolt bolt = new PulsarBolt(boltConf, clientConf);
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("testSpout", (IRichSpout)spout);
        builder.setBolt("testBolt", (IRichBolt)bolt).shuffleGrouping("testSpout");
        Config conf = new Config();
        conf.setNumWorkers(2);
        conf.setDebug(true);
        conf.registerMetricsConsumer(PulsarMetricsConsumer.class);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("test", (Map)conf, builder.createTopology());
        Utils.sleep((long)10000L);
        PulsarClient pulsarClient = PulsarClient.create((String)serviceUrl, (ClientConfiguration)clientConf);
        Consumer consumer = pulsarClient.subscribe(topic2, subscriptionName2);
        Producer producer = pulsarClient.createProducer(topic1);
        for (int i = 0; i < 10; ++i) {
            String msg = "msg-" + i;
            producer.send(msg.getBytes());
            LOG.info("Message {} sent", (Object)msg);
        }
        Message msg = null;
        for (int i = 0; i < 10; ++i) {
            msg = consumer.receive(1, TimeUnit.SECONDS);
            LOG.info("Message {} received", (Object)new String(msg.getData()));
        }
        cluster.killTopology("test");
        cluster.shutdown();
    }

    class PulsarMetricsConsumer
    implements IMetricsConsumer {
        PulsarMetricsConsumer() {
        }

        public void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) {
        }

        public void handleDataPoints(IMetricsConsumer.TaskInfo taskInfo, Collection<IMetricsConsumer.DataPoint> dataPoints) {
        }

        public void cleanup() {
        }
    }
}

