package org.apache.storm.kafka.trident;

import java.io.Serializable;
import java.util.concurrent.TimeUnit;
import kafka.api.OffsetRequest;
import org.apache.hadoop.fs.shell.Test;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.spout.SchemeAsMultiScheme;

/* loaded from: input_file:org/apache/storm/kafka/trident/TridentKafkaWordCount.class */
public class TridentKafkaWordCount implements Serializable {
    public static void main(String[] strArr) throws Exception {
        String[] parseUrl = parseUrl(strArr);
        Config defaultConfig = LocalSubmitter.defaultConfig();
        if (strArr.length == 3) {
            StormSubmitter.submitTopology(strArr[2] + "-producer", defaultConfig, KafkaProducerTopology.newTopology(parseUrl[1], Test.NAME));
            StormSubmitter.submitTopology(strArr[2] + "-consumer", defaultConfig, TridentKafkaConsumerTopology.newTopology(new TransactionalTridentKafkaSpout(newTridentKafkaConfig(parseUrl[0]))));
            Thread.sleep(2000L);
            DrpcResultsPrinter.remoteClient().printResults(60, 1, TimeUnit.SECONDS);
            return;
        }
        LocalSubmitter newInstance = LocalSubmitter.newInstance();
        try {
            newInstance.submit("kafkaBolt", defaultConfig, KafkaProducerTopology.newTopology(parseUrl[1], Test.NAME));
            newInstance.submit("wordCounter", defaultConfig, TridentKafkaConsumerTopology.newTopology(newInstance.getDrpc(), new TransactionalTridentKafkaSpout(newTridentKafkaConfig(parseUrl[0]))));
            new DrpcResultsPrinter(newInstance.getDrpc()).printResults(60, 1, TimeUnit.SECONDS);
            newInstance.kill("kafkaBolt");
            newInstance.kill("wordCounter");
            newInstance.shutdown();
        } catch (Throwable th) {
            newInstance.kill("kafkaBolt");
            newInstance.kill("wordCounter");
            newInstance.shutdown();
            throw th;
        }
    }

    private static String[] parseUrl(String[] strArr) {
        String str = "localhost:2181";
        String str2 = "localhost:9092";
        if (strArr.length > 3 || (strArr.length == 1 && strArr[0].matches("^-h|--help$"))) {
            System.out.println("Usage: TridentKafkaWordCount [kafka zookeeper url] [kafka broker url] [topology name]");
            System.out.println("   E.g TridentKafkaWordCount [" + str + "] [" + str2 + "] [wordcount]");
            System.exit(1);
        } else if (strArr.length == 1) {
            str = strArr[0];
        } else if (strArr.length == 2) {
            str = strArr[0];
            str2 = strArr[1];
        }
        System.out.println("Using Kafka zookeeper uHrl: " + str + " broker url: " + str2);
        return new String[]{str, str2};
    }

    private static TridentKafkaConfig newTridentKafkaConfig(String str) {
        TridentKafkaConfig tridentKafkaConfig = new TridentKafkaConfig(new ZkHosts(str), Test.NAME);
        tridentKafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        tridentKafkaConfig.startOffsetTime = OffsetRequest.LatestTime();
        return tridentKafkaConfig;
    }
}
