package org.apache.storm.kafka.spout.test;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.spout.ByTopicRecordTranslator;
import org.apache.storm.kafka.spout.Func;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

/* loaded from: input_file:org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.class */
public class KafkaSpoutTopologyMainNamedTopics {
    private static final String TOPIC_2_STREAM = "test_2_stream";
    private static final String TOPIC_0_1_STREAM = "test_0_1_stream";
    private static final String[] TOPICS = {SingleTopicKafkaSpoutConfiguration.TOPIC, "test1", "test2"};
    public static Func<ConsumerRecord<String, String>, List<Object>> TOPIC_PART_OFF_KEY_VALUE_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() { // from class: org.apache.storm.kafka.spout.test.KafkaSpoutTopologyMainNamedTopics.1
        public List<Object> apply(ConsumerRecord<String, String> consumerRecord) {
            return new Values(new Object[]{consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), consumerRecord.key(), consumerRecord.value()});
        }
    };

    public static void main(String[] strArr) throws Exception {
        new KafkaSpoutTopologyMainNamedTopics().runMain(strArr);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void runMain(String[] strArr) throws Exception {
        if (strArr.length == 0) {
            submitTopologyLocalCluster(getTopologyKafkaSpout(), getConfig());
        } else {
            submitTopologyRemoteCluster(strArr[0], getTopologyKafkaSpout(), getConfig());
        }
    }

    protected void submitTopologyLocalCluster(StormTopology stormTopology, Config config) throws InterruptedException {
        new LocalCluster().submitTopology(SingleTopicKafkaSpoutConfiguration.TOPIC, config, stormTopology);
        stopWaitingForInput();
    }

    protected void submitTopologyRemoteCluster(String str, StormTopology stormTopology, Config config) throws Exception {
        StormSubmitter.submitTopology(str, config, stormTopology);
    }

    protected void stopWaitingForInput() {
        try {
            System.out.println("PRESS ENTER TO STOP");
            new BufferedReader(new InputStreamReader(System.in)).readLine();
            System.exit(0);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    protected Config getConfig() {
        Config config = new Config();
        config.setDebug(true);
        return config;
    }

    protected StormTopology getTopologyKafkaSpout() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("kafka_spout", new KafkaSpout(getKafkaSpoutConfig()), 1);
        topologyBuilder.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", TOPIC_0_1_STREAM).shuffleGrouping("kafka_spout", TOPIC_2_STREAM);
        topologyBuilder.setBolt("kafka_bolt_1", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", TOPIC_2_STREAM);
        return topologyBuilder.createTopology();
    }

    protected KafkaSpoutConfig<String, String> getKafkaSpoutConfig() {
        ByTopicRecordTranslator byTopicRecordTranslator = new ByTopicRecordTranslator(TOPIC_PART_OFF_KEY_VALUE_FUNC, new Fields(new String[]{"topic", "partition", "offset", "key", "value"}), TOPIC_0_1_STREAM);
        byTopicRecordTranslator.forTopic(TOPICS[2], TOPIC_PART_OFF_KEY_VALUE_FUNC, new Fields(new String[]{"topic", "partition", "offset", "key", "value"}), TOPIC_2_STREAM);
        return KafkaSpoutConfig.builder("127.0.0.1:9092", TOPICS).setGroupId("kafkaSpoutTestGroup").setRetry(getRetryService()).setRecordTranslator(byTopicRecordTranslator).setOffsetCommitPeriodMs(10000L).setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST).setMaxUncommittedOffsets(250).build();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public KafkaSpoutRetryService getRetryService() {
        return new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(500L), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2L), Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10L));
    }
}
