package org.apache.storm.perf;

import java.util.Map;
import java.util.UUID;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.hdfs.bolt.HdfsBolt;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.format.RecordFormat;
import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.StringMultiSchemeWithTopic;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.perf.utils.Helper;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.Utils;

/* loaded from: input_file:org/apache/storm/perf/KafkaHdfsTopo.class */
public class KafkaHdfsTopo {
    public static final String SPOUT_NUM = "spout.count";
    public static final String BOLT_NUM = "bolt.count";
    public static final String KAFKA_TOPIC = "kafka.topic";
    public static final String ZOOKEEPER_URI = "zk.uri";
    public static final String HDFS_URI = "hdfs.uri";
    public static final String HDFS_PATH = "hdfs.dir";
    public static final String HDFS_BATCH = "hdfs.batch";
    public static final int DEFAULT_SPOUT_NUM = 1;
    public static final int DEFAULT_BOLT_NUM = 1;
    public static final int DEFAULT_HDFS_BATCH = 1000;
    public static final String TOPOLOGY_NAME = "KafkaHdfsTopo";
    public static final String SPOUT_ID = "kafkaSpout";
    public static final String BOLT_ID = "hdfsBolt";

    /* loaded from: input_file:org/apache/storm/perf/KafkaHdfsTopo$LineWriter.class */
    public static class LineWriter implements RecordFormat {
        private String lineDelimiter = System.lineSeparator();
        private String fieldName;

        public LineWriter(String str) {
            this.fieldName = str;
        }

        public LineWriter withLineDelimiter(String str) {
            this.lineDelimiter = str;
            return this;
        }

        public byte[] format(Tuple tuple) {
            return (tuple.getValueByField(this.fieldName).toString() + this.lineDelimiter).getBytes();
        }
    }

    public static StormTopology getTopology(Map map) {
        int i = getInt(map, "spout.count", 1);
        int i2 = getInt(map, "bolt.count", 1);
        int i3 = getInt(map, "hdfs.batch", 1000);
        String str = getStr(map, "zk.uri");
        String str2 = getStr(map, "kafka.topic");
        SpoutConfig spoutConfig = new SpoutConfig(new ZkHosts(str), str2, "/" + str2, UUID.randomUUID().toString());
        spoutConfig.scheme = new StringMultiSchemeWithTopic();
        spoutConfig.ignoreZkOffsets = true;
        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
        String str3 = getStr(map, "hdfs.uri");
        HdfsBolt withSyncPolicy = new HdfsBolt().withFsUrl(str3).withFileNameFormat(new DefaultFileNameFormat().withPath(getStr(map, "hdfs.dir"))).withRecordFormat(new LineWriter("str")).withRotationPolicy(new FileSizeRotationPolicy(1.0f, FileSizeRotationPolicy.Units.GB)).withSyncPolicy(new CountSyncPolicy(i3));
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("kafkaSpout", kafkaSpout, Integer.valueOf(i));
        topologyBuilder.setBolt("hdfsBolt", withSyncPolicy, Integer.valueOf(i2)).localOrShuffleGrouping("kafkaSpout");
        return topologyBuilder.createTopology();
    }

    public static int getInt(Map map, Object obj, int i) {
        return Utils.getInt(Utils.get(map, obj, Integer.valueOf(i))).intValue();
    }

    public static String getStr(Map map, Object obj) {
        return (String) map.get(obj);
    }

    public static void main(String[] strArr) throws Exception {
        if (strArr.length != 2) {
            System.err.println("args: runDurationSec topConfFile");
            return;
        }
        Integer valueOf = Integer.valueOf(Integer.parseInt(strArr[0]));
        Map findAndReadConfigFile = Utils.findAndReadConfigFile(strArr[1]);
        Helper.runOnClusterAndPrintMetrics(valueOf, TOPOLOGY_NAME, findAndReadConfigFile, getTopology(findAndReadConfigFile));
    }
}
